Skip to main content

vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical as _;
13use vortex_array::VortexSessionExecute;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::expr::stats::Stat;
18use vortex_array::stats::StatsSet;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_panic;
22
23use crate::layouts::zoned::zone_map::StatsAccumulator;
24use crate::sequence::SendableSequentialStream;
25use crate::sequence::SequenceId;
26use crate::sequence::SequentialStreamAdapter;
27use crate::sequence::SequentialStreamExt;
28
29pub fn accumulate_stats(
30    stream: SendableSequentialStream,
31    stats: Arc<[Stat]>,
32    max_variable_length_statistics_size: usize,
33) -> (FileStatsAccumulator, SendableSequentialStream) {
34    let accumulator =
35        FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
36    let stream = SequentialStreamAdapter::new(
37        stream.dtype().clone(),
38        stream.scan(accumulator.clone(), |acc, item| {
39            future::ready(Some(acc.process(item)))
40        }),
41    )
42    .sendable();
43    (accumulator, stream)
44}
45
46/// An array stream processor that computes aggregate statistics for all fields.
47///
48/// Note: for now this only collects top-level struct fields.
49#[derive(Clone)]
50pub struct FileStatsAccumulator {
51    stats: Arc<[Stat]>,
52    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
53}
54
55impl FileStatsAccumulator {
56    fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
57        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
58            Some(struct_dtype) => {
59                if dtype.nullability() == Nullability::Nullable {
60                    // top level dtype could be nullable, but we don't support it yet
61                    vortex_panic!(
62                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
63                        dtype
64                    );
65                }
66
67                struct_dtype
68                    .fields()
69                    .map(|field_dtype| {
70                        StatsAccumulator::new(
71                            &field_dtype,
72                            &stats,
73                            max_variable_length_statistics_size,
74                        )
75                    })
76                    .collect()
77            }
78            None => [StatsAccumulator::new(
79                dtype,
80                &stats,
81                max_variable_length_statistics_size,
82            )]
83            .into(),
84        }));
85
86        Self {
87            stats,
88            accumulators,
89        }
90    }
91
92    fn process(
93        &self,
94        chunk: VortexResult<(SequenceId, ArrayRef)>,
95    ) -> VortexResult<(SequenceId, ArrayRef)> {
96        let (sequence_id, chunk) = chunk?;
97        if chunk.dtype().is_struct() {
98            let chunk = chunk.to_struct();
99            for (acc, field) in self
100                .accumulators
101                .lock()
102                .iter_mut()
103                .zip_eq(chunk.iter_unmasked_fields())
104            {
105                acc.push_chunk(field)?;
106            }
107        } else {
108            self.accumulators.lock()[0].push_chunk(&chunk)?;
109        }
110        Ok((sequence_id, chunk))
111    }
112
113    pub fn stats_sets(&self) -> Vec<StatsSet> {
114        let mut ctx = LEGACY_SESSION.create_execution_ctx();
115        self.accumulators
116            .lock()
117            .iter_mut()
118            .map(|acc| {
119                acc.as_stats_table()
120                    .vortex_expect("as_stats_table should not fail")
121                    .map(|table| {
122                        table
123                            .to_stats_set(&self.stats, &mut ctx)
124                            .vortex_expect("shouldn't fail to convert table we just created")
125                    })
126                    .unwrap_or_default()
127            })
128            .collect()
129    }
130}