vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::stats::{Stat, StatsSet};
11use vortex_array::{ArrayRef, ToCanonical as _};
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::sequence::{
17    SendableSequentialStream, SequenceId, SequentialStreamAdapter, SequentialStreamExt,
18};
19
20pub fn accumulate_stats(
21    stream: SendableSequentialStream,
22    stats: Arc<[Stat]>,
23    max_variable_length_statistics_size: usize,
24) -> (FileStatsAccumulator, SendableSequentialStream) {
25    let accumulator =
26        FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
27    let stream = SequentialStreamAdapter::new(
28        stream.dtype().clone(),
29        stream.scan(accumulator.clone(), |acc, item| {
30            future::ready(Some(acc.process(item)))
31        }),
32    )
33    .sendable();
34    (accumulator, stream)
35}
36
37/// An array stream processor that computes aggregate statistics for all fields.
38///
39/// Note: for now this only collects top-level struct fields.
40#[derive(Clone)]
41pub struct FileStatsAccumulator {
42    stats: Arc<[Stat]>,
43    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
44}
45
46impl FileStatsAccumulator {
47    fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
48        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
49            Some(struct_dtype) => {
50                if dtype.nullability() == Nullability::Nullable {
51                    // top level dtype could be nullable, but we don't support it yet
52                    vortex_panic!(
53                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
54                        dtype
55                    );
56                }
57
58                struct_dtype
59                    .fields()
60                    .map(|field_dtype| {
61                        StatsAccumulator::new(
62                            &field_dtype,
63                            &stats,
64                            max_variable_length_statistics_size,
65                        )
66                    })
67                    .collect()
68            }
69            None => [StatsAccumulator::new(
70                dtype,
71                &stats,
72                max_variable_length_statistics_size,
73            )]
74            .into(),
75        }));
76
77        Self {
78            stats,
79            accumulators,
80        }
81    }
82
83    fn process(
84        &self,
85        chunk: VortexResult<(SequenceId, ArrayRef)>,
86    ) -> VortexResult<(SequenceId, ArrayRef)> {
87        let (sequence_id, chunk) = chunk?;
88        if chunk.dtype().is_struct() {
89            let chunk = chunk.to_struct();
90            for (acc, field) in self
91                .accumulators
92                .lock()
93                .iter_mut()
94                .zip_eq(chunk.fields().iter())
95            {
96                acc.push_chunk(field)?;
97            }
98        } else {
99            self.accumulators.lock()[0].push_chunk(&chunk)?;
100        }
101        Ok((sequence_id, chunk))
102    }
103
104    pub fn stats_sets(&self) -> Vec<StatsSet> {
105        self.accumulators
106            .lock()
107            .iter_mut()
108            .map(|acc| {
109                acc.as_stats_table()
110                    .map(|table| {
111                        table
112                            .to_stats_set(&self.stats)
113                            .vortex_expect("shouldn't fail to convert table we just created")
114                    })
115                    .unwrap_or_default()
116            })
117            .collect()
118    }
119}