Skip to main content

vortex_layout/layouts/
file_stats.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::ArrayRef;
11use vortex_array::LEGACY_SESSION;
12use vortex_array::ToCanonical as _;
13use vortex_array::VortexSessionExecute;
14use vortex_array::dtype::DType;
15use vortex_array::dtype::Nullability;
16use vortex_array::expr::stats::Stat;
17use vortex_array::stats::StatsSet;
18use vortex_error::VortexExpect;
19use vortex_error::VortexResult;
20use vortex_error::vortex_panic;
21
22use crate::layouts::zoned::zone_map::StatsAccumulator;
23use crate::sequence::SendableSequentialStream;
24use crate::sequence::SequenceId;
25use crate::sequence::SequentialStreamAdapter;
26use crate::sequence::SequentialStreamExt;
27
28pub fn accumulate_stats(
29    stream: SendableSequentialStream,
30    stats: Arc<[Stat]>,
31    max_variable_length_statistics_size: usize,
32) -> (FileStatsAccumulator, SendableSequentialStream) {
33    let accumulator =
34        FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
35    let stream = SequentialStreamAdapter::new(
36        stream.dtype().clone(),
37        stream.scan(accumulator.clone(), |acc, item| {
38            future::ready(Some(acc.process(item)))
39        }),
40    )
41    .sendable();
42    (accumulator, stream)
43}
44
45/// An array stream processor that computes aggregate statistics for all fields.
46///
47/// Note: for now this only collects top-level struct fields.
48#[derive(Clone)]
49pub struct FileStatsAccumulator {
50    stats: Arc<[Stat]>,
51    accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
52}
53
54impl FileStatsAccumulator {
55    fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
56        let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
57            Some(struct_dtype) => {
58                if dtype.nullability() == Nullability::Nullable {
59                    // top level dtype could be nullable, but we don't support it yet
60                    vortex_panic!(
61                        "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
62                        dtype
63                    );
64                }
65
66                struct_dtype
67                    .fields()
68                    .map(|field_dtype| {
69                        StatsAccumulator::new(
70                            &field_dtype,
71                            &stats,
72                            max_variable_length_statistics_size,
73                        )
74                    })
75                    .collect()
76            }
77            None => [StatsAccumulator::new(
78                dtype,
79                &stats,
80                max_variable_length_statistics_size,
81            )]
82            .into(),
83        }));
84
85        Self {
86            stats,
87            accumulators,
88        }
89    }
90
91    fn process(
92        &self,
93        chunk: VortexResult<(SequenceId, ArrayRef)>,
94    ) -> VortexResult<(SequenceId, ArrayRef)> {
95        let (sequence_id, chunk) = chunk?;
96        if chunk.dtype().is_struct() {
97            let chunk = chunk.to_struct();
98            for (acc, field) in self
99                .accumulators
100                .lock()
101                .iter_mut()
102                .zip_eq(chunk.unmasked_fields().iter())
103            {
104                acc.push_chunk(field)?;
105            }
106        } else {
107            self.accumulators.lock()[0].push_chunk(&chunk)?;
108        }
109        Ok((sequence_id, chunk))
110    }
111
112    pub fn stats_sets(&self) -> Vec<StatsSet> {
113        let mut ctx = LEGACY_SESSION.create_execution_ctx();
114        self.accumulators
115            .lock()
116            .iter_mut()
117            .map(|acc| {
118                acc.as_stats_table()
119                    .vortex_expect("as_stats_table should not fail")
120                    .map(|table| {
121                        table
122                            .to_stats_set(&self.stats, &mut ctx)
123                            .vortex_expect("shouldn't fail to convert table we just created")
124                    })
125                    .unwrap_or_default()
126            })
127            .collect()
128    }
129}