vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::ToCanonical as _;
12use vortex_array::expr::stats::Stat;
13use vortex_array::stats::StatsSet;
14use vortex_dtype::DType;
15use vortex_dtype::Nullability;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19
20use crate::layouts::zoned::zone_map::StatsAccumulator;
21use crate::sequence::SendableSequentialStream;
22use crate::sequence::SequenceId;
23use crate::sequence::SequentialStreamAdapter;
24use crate::sequence::SequentialStreamExt;
25
26pub fn accumulate_stats(
27    stream: SendableSequentialStream,
28    stats: Arc<[Stat]>,
29    max_variable_length_statistics_size: usize,
30) -> (FileStatsAccumulator, SendableSequentialStream) {
31    let accumulator =
32        FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
33    let stream = SequentialStreamAdapter::new(
34        stream.dtype().clone(),
35        stream.scan(accumulator.clone(), |acc, item| {
36            future::ready(Some(acc.process(item)))
37        }),
38    )
39    .sendable();
40    (accumulator, stream)
41}
42
43/// An array stream processor that computes aggregate statistics for all fields.
44///
45/// Note: for now this only collects top-level struct fields.
46#[derive(Clone)]
47pub struct FileStatsAccumulator {
48    stats: Arc<[Stat]>,
49    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
50}
51
52impl FileStatsAccumulator {
53    fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
54        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
55            Some(struct_dtype) => {
56                if dtype.nullability() == Nullability::Nullable {
57                    // top level dtype could be nullable, but we don't support it yet
58                    vortex_panic!(
59                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
60                        dtype
61                    );
62                }
63
64                struct_dtype
65                    .fields()
66                    .map(|field_dtype| {
67                        StatsAccumulator::new(
68                            &field_dtype,
69                            &stats,
70                            max_variable_length_statistics_size,
71                        )
72                    })
73                    .collect()
74            }
75            None => [StatsAccumulator::new(
76                dtype,
77                &stats,
78                max_variable_length_statistics_size,
79            )]
80            .into(),
81        }));
82
83        Self {
84            stats,
85            accumulators,
86        }
87    }
88
89    fn process(
90        &self,
91        chunk: VortexResult<(SequenceId, ArrayRef)>,
92    ) -> VortexResult<(SequenceId, ArrayRef)> {
93        let (sequence_id, chunk) = chunk?;
94        if chunk.dtype().is_struct() {
95            let chunk = chunk.to_struct();
96            for (acc, field) in self
97                .accumulators
98                .lock()
99                .iter_mut()
100                .zip_eq(chunk.fields().iter())
101            {
102                acc.push_chunk(field)?;
103            }
104        } else {
105            self.accumulators.lock()[0].push_chunk(&chunk)?;
106        }
107        Ok((sequence_id, chunk))
108    }
109
110    pub fn stats_sets(&self) -> Vec<StatsSet> {
111        self.accumulators
112            .lock()
113            .iter_mut()
114            .map(|acc| {
115                acc.as_stats_table()
116                    .map(|table| {
117                        table
118                            .to_stats_set(&self.stats)
119                            .vortex_expect("shouldn't fail to convert table we just created")
120                    })
121                    .unwrap_or_default()
122            })
123            .collect()
124    }
125}