vortex_layout/
stats.rs

1use std::sync::Arc;
2
3use itertools::Itertools;
4use vortex_array::ArrayRef;
5use vortex_array::stats::{Stat, StatsSet};
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::layouts::stats::stats_table::StatsAccumulator;
10use crate::segments::SegmentWriter;
11use crate::{Layout, LayoutWriter};
12
13/// A layout writer that computes aggregate statistics for all fields.
14///
15/// Note: for now this only collects top-level struct fields.
16pub struct FileStatsLayoutWriter {
17    inner: Box<dyn LayoutWriter>,
18    stats: Arc<[Stat]>,
19    stats_accumulators: Vec<StatsAccumulator>,
20}
21
22impl FileStatsLayoutWriter {
23    pub fn new(
24        inner: Box<dyn LayoutWriter>,
25        dtype: &DType,
26        stats: Arc<[Stat]>,
27    ) -> VortexResult<Self> {
28        let stats_accumulators = match dtype.as_struct() {
29            Some(dtype) => dtype
30                .fields()
31                .map(|field_dtype| StatsAccumulator::new(field_dtype, &stats))
32                .collect(),
33            None => [StatsAccumulator::new(dtype.clone(), &stats)].into(),
34        };
35
36        Ok(Self {
37            inner,
38            stats,
39            stats_accumulators,
40        })
41    }
42
43    /// Returns one [`StatsSet`] per field in the [`DType::Struct`] of the layout.
44    pub fn into_stats_sets(self) -> Vec<StatsSet> {
45        self.stats_accumulators
46            .into_iter()
47            .map(|mut acc| {
48                acc.as_stats_table()
49                    .map(|table| {
50                        table
51                            .to_stats_set(&self.stats)
52                            .vortex_expect("shouldn't fail to convert table we just created")
53                    })
54                    .unwrap_or_default()
55            })
56            .collect()
57    }
58}
59
60impl LayoutWriter for FileStatsLayoutWriter {
61    fn push_chunk(
62        &mut self,
63        segment_writer: &mut dyn SegmentWriter,
64        chunk: ArrayRef,
65    ) -> VortexResult<()> {
66        match chunk.as_struct_typed() {
67            None => {
68                self.stats_accumulators[0].push_chunk(&chunk)?;
69            }
70            Some(array) => {
71                for (acc, field) in self.stats_accumulators.iter_mut().zip_eq(array.fields()) {
72                    acc.push_chunk(&field)?;
73                }
74            }
75        }
76        self.inner.push_chunk(segment_writer, chunk)
77    }
78
79    fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
80        self.inner.flush(segment_writer)
81    }
82
83    fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
84        self.inner.finish(segment_writer)
85    }
86}