vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::stats::{Stat, StatsSet};
11use vortex_array::{ArrayRef, ToCanonical as _};
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::sequence::SequenceId;
17use crate::{SendableSequentialStream, SequentialStreamAdapter, SequentialStreamExt};
18
19pub fn accumulate_stats(
20    stream: SendableSequentialStream,
21    stats: Arc<[Stat]>,
22    max_variable_length_statistics_size: usize,
23) -> (FileStatsAccumulator, SendableSequentialStream) {
24    let accumulator =
25        FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
26    let stream = SequentialStreamAdapter::new(
27        stream.dtype().clone(),
28        stream.scan(accumulator.clone(), |acc, item| {
29            future::ready(Some(acc.process(item)))
30        }),
31    )
32    .sendable();
33    (accumulator, stream)
34}
35
36/// An array stream processor that computes aggregate statistics for all fields.
37///
38/// Note: for now this only collects top-level struct fields.
39#[derive(Clone)]
40pub struct FileStatsAccumulator {
41    stats: Arc<[Stat]>,
42    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
43}
44
45impl FileStatsAccumulator {
46    fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
47        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_opt() {
48            Some(struct_dtype) => {
49                if dtype.nullability() == Nullability::Nullable {
50                    // top level dtype could be nullable, but we don't support it yet
51                    vortex_panic!(
52                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
53                        dtype
54                    );
55                }
56
57                struct_dtype
58                    .fields()
59                    .map(|field_dtype| {
60                        StatsAccumulator::new(
61                            &field_dtype,
62                            &stats,
63                            max_variable_length_statistics_size,
64                        )
65                    })
66                    .collect()
67            }
68            None => [StatsAccumulator::new(
69                dtype,
70                &stats,
71                max_variable_length_statistics_size,
72            )]
73            .into(),
74        }));
75
76        Self {
77            stats,
78            accumulators,
79        }
80    }
81
82    fn process(
83        &self,
84        chunk: VortexResult<(SequenceId, ArrayRef)>,
85    ) -> VortexResult<(SequenceId, ArrayRef)> {
86        let (sequence_id, chunk) = chunk?;
87        if chunk.dtype().is_struct() {
88            let chunk = chunk.to_struct()?;
89            for (acc, field) in self.accumulators.lock().iter_mut().zip_eq(chunk.fields()) {
90                acc.push_chunk(field)?;
91            }
92        } else {
93            self.accumulators.lock()[0].push_chunk(&chunk)?;
94        }
95        Ok((sequence_id, chunk))
96    }
97
98    pub fn stats_sets(&self) -> Vec<StatsSet> {
99        self.accumulators
100            .lock()
101            .iter_mut()
102            .map(|acc| {
103                acc.as_stats_table()
104                    .map(|table| {
105                        table
106                            .to_stats_set(&self.stats)
107                            .vortex_expect("shouldn't fail to convert table we just created")
108                    })
109                    .unwrap_or_default()
110            })
111            .collect()
112    }
113}