Skip to main content

vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::ExecutionCtx;
12use vortex_array::VortexSessionExecute;
13use vortex_array::arrays::StructArray;
14use vortex_array::arrays::struct_::StructArrayExt;
15use vortex_array::dtype::DType;
16use vortex_array::dtype::Nullability;
17use vortex_array::expr::stats::Stat;
18use vortex_array::stats::StatsSet;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_panic;
22use vortex_session::VortexSession;
23
24use crate::layouts::zoned::StatsAccumulator;
25use crate::sequence::SendableSequentialStream;
26use crate::sequence::SequenceId;
27use crate::sequence::SequentialStreamAdapter;
28use crate::sequence::SequentialStreamExt;
29
30pub fn accumulate_stats(
31    stream: SendableSequentialStream,
32    stats: Arc<[Stat]>,
33    max_variable_length_statistics_size: usize,
34    session: &VortexSession,
35) -> (FileStatsAccumulator, SendableSequentialStream) {
36    let accumulator = FileStatsAccumulator::new(
37        stream.dtype(),
38        stats,
39        max_variable_length_statistics_size,
40        session,
41    );
42    let stream = SequentialStreamAdapter::new(
43        stream.dtype().clone(),
44        stream.scan(accumulator.clone(), |acc, item| {
45            future::ready(Some(acc.process(item)))
46        }),
47    )
48    .sendable();
49    (accumulator, stream)
50}
51
52/// An array stream processor that computes aggregate statistics for all fields.
53///
54/// Note: for now this only collects top-level struct fields.
55#[derive(Clone)]
56pub struct FileStatsAccumulator {
57    stats: Arc<[Stat]>,
58    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
59    ctx: Arc<Mutex<ExecutionCtx>>,
60}
61
62impl FileStatsAccumulator {
63    fn new(
64        dtype: &DType,
65        stats: Arc<[Stat]>,
66        max_variable_length_statistics_size: usize,
67        session: &VortexSession,
68    ) -> Self {
69        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
70            Some(struct_dtype) => {
71                if dtype.nullability() == Nullability::Nullable {
72                    // top level dtype could be nullable, but we don't support it yet
73                    vortex_panic!(
74                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
75                        dtype
76                    );
77                }
78
79                struct_dtype
80                    .fields()
81                    .map(|field_dtype| {
82                        StatsAccumulator::new(
83                            &field_dtype,
84                            &stats,
85                            max_variable_length_statistics_size,
86                        )
87                    })
88                    .collect()
89            }
90            None => [StatsAccumulator::new(
91                dtype,
92                &stats,
93                max_variable_length_statistics_size,
94            )]
95            .into(),
96        }));
97
98        Self {
99            stats,
100            accumulators,
101            ctx: Arc::new(Mutex::new(session.create_execution_ctx())),
102        }
103    }
104
105    fn process(
106        &self,
107        chunk: VortexResult<(SequenceId, ArrayRef)>,
108    ) -> VortexResult<(SequenceId, ArrayRef)> {
109        let (sequence_id, chunk) = chunk?;
110        let mut ctx = self.ctx.lock();
111        if chunk.dtype().is_struct() {
112            let struct_chunk = chunk.clone().execute::<StructArray>(&mut ctx)?;
113            for (acc, field) in self
114                .accumulators
115                .lock()
116                .iter_mut()
117                .zip_eq(struct_chunk.iter_unmasked_fields())
118            {
119                acc.push_chunk(field, &mut ctx)?;
120            }
121        } else {
122            self.accumulators.lock()[0].push_chunk(&chunk, &mut ctx)?;
123        }
124        Ok((sequence_id, chunk))
125    }
126
127    pub fn stats_sets(&self) -> Vec<StatsSet> {
128        let mut ctx = self.ctx.lock();
129        self.accumulators
130            .lock()
131            .iter_mut()
132            .map(|acc| {
133                acc.as_stats_set(&self.stats, &mut ctx)
134                    .vortex_expect("as_stats_table should not fail")
135            })
136            .collect()
137    }
138}