vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::stats::{Stat, StatsSet};
11use vortex_array::{ArrayRef, ToCanonical as _};
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::sequence::SequenceId;
17use crate::{SendableSequentialStream, SequentialStreamAdapter, SequentialStreamExt};
18
19pub fn accumulate_stats(
20 stream: SendableSequentialStream,
21 stats: Arc<[Stat]>,
22 max_variable_length_statistics_size: usize,
23) -> (FileStatsAccumulator, SendableSequentialStream) {
24 let accumulator =
25 FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
26 let stream = SequentialStreamAdapter::new(
27 stream.dtype().clone(),
28 stream.scan(accumulator.clone(), |acc, item| {
29 future::ready(Some(acc.process(item)))
30 }),
31 )
32 .sendable();
33 (accumulator, stream)
34}
35
36#[derive(Clone)]
40pub struct FileStatsAccumulator {
41 stats: Arc<[Stat]>,
42 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
43}
44
45impl FileStatsAccumulator {
46 fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
47 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_opt() {
48 Some(struct_dtype) => {
49 if dtype.nullability() == Nullability::Nullable {
50 vortex_panic!(
52 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
53 dtype
54 );
55 }
56
57 struct_dtype
58 .fields()
59 .map(|field_dtype| {
60 StatsAccumulator::new(
61 &field_dtype,
62 &stats,
63 max_variable_length_statistics_size,
64 )
65 })
66 .collect()
67 }
68 None => [StatsAccumulator::new(
69 dtype,
70 &stats,
71 max_variable_length_statistics_size,
72 )]
73 .into(),
74 }));
75
76 Self {
77 stats,
78 accumulators,
79 }
80 }
81
82 fn process(
83 &self,
84 chunk: VortexResult<(SequenceId, ArrayRef)>,
85 ) -> VortexResult<(SequenceId, ArrayRef)> {
86 let (sequence_id, chunk) = chunk?;
87 if chunk.dtype().is_struct() {
88 let chunk = chunk.to_struct()?;
89 for (acc, field) in self.accumulators.lock().iter_mut().zip_eq(chunk.fields()) {
90 acc.push_chunk(field)?;
91 }
92 } else {
93 self.accumulators.lock()[0].push_chunk(&chunk)?;
94 }
95 Ok((sequence_id, chunk))
96 }
97
98 pub fn stats_sets(&self) -> Vec<StatsSet> {
99 self.accumulators
100 .lock()
101 .iter_mut()
102 .map(|acc| {
103 acc.as_stats_table()
104 .map(|table| {
105 table
106 .to_stats_set(&self.stats)
107 .vortex_expect("shouldn't fail to convert table we just created")
108 })
109 .unwrap_or_default()
110 })
111 .collect()
112 }
113}