vortex_layout/layouts/
file_stats.rs1use std::future;
5use std::sync::Arc;
6
7use futures::StreamExt;
8use itertools::Itertools;
9use parking_lot::Mutex;
10use vortex_array::stats::{Stat, StatsSet};
11use vortex_array::{ArrayRef, ToCanonical as _};
12use vortex_dtype::{DType, Nullability};
13use vortex_error::{VortexExpect, VortexResult, vortex_panic};
14
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::sequence::{
17 SendableSequentialStream, SequenceId, SequentialStreamAdapter, SequentialStreamExt,
18};
19
20pub fn accumulate_stats(
21 stream: SendableSequentialStream,
22 stats: Arc<[Stat]>,
23 max_variable_length_statistics_size: usize,
24) -> (FileStatsAccumulator, SendableSequentialStream) {
25 let accumulator =
26 FileStatsAccumulator::new(stream.dtype(), stats, max_variable_length_statistics_size);
27 let stream = SequentialStreamAdapter::new(
28 stream.dtype().clone(),
29 stream.scan(accumulator.clone(), |acc, item| {
30 future::ready(Some(acc.process(item)))
31 }),
32 )
33 .sendable();
34 (accumulator, stream)
35}
36
37#[derive(Clone)]
41pub struct FileStatsAccumulator {
42 stats: Arc<[Stat]>,
43 accumulators: Arc<Mutex<Vec<StatsAccumulator>>>,
44}
45
46impl FileStatsAccumulator {
47 fn new(dtype: &DType, stats: Arc<[Stat]>, max_variable_length_statistics_size: usize) -> Self {
48 let accumulators = Arc::new(Mutex::new(match dtype.as_struct_fields_opt() {
49 Some(struct_dtype) => {
50 if dtype.nullability() == Nullability::Nullable {
51 vortex_panic!(
53 "FileStatsAccumulator temporarily does not support nullable top-level structs, got: {}. Use Validity::NonNullable",
54 dtype
55 );
56 }
57
58 struct_dtype
59 .fields()
60 .map(|field_dtype| {
61 StatsAccumulator::new(
62 &field_dtype,
63 &stats,
64 max_variable_length_statistics_size,
65 )
66 })
67 .collect()
68 }
69 None => [StatsAccumulator::new(
70 dtype,
71 &stats,
72 max_variable_length_statistics_size,
73 )]
74 .into(),
75 }));
76
77 Self {
78 stats,
79 accumulators,
80 }
81 }
82
83 fn process(
84 &self,
85 chunk: VortexResult<(SequenceId, ArrayRef)>,
86 ) -> VortexResult<(SequenceId, ArrayRef)> {
87 let (sequence_id, chunk) = chunk?;
88 if chunk.dtype().is_struct() {
89 let chunk = chunk.to_struct();
90 for (acc, field) in self
91 .accumulators
92 .lock()
93 .iter_mut()
94 .zip_eq(chunk.fields().iter())
95 {
96 acc.push_chunk(field)?;
97 }
98 } else {
99 self.accumulators.lock()[0].push_chunk(&chunk)?;
100 }
101 Ok((sequence_id, chunk))
102 }
103
104 pub fn stats_sets(&self) -> Vec<StatsSet> {
105 self.accumulators
106 .lock()
107 .iter_mut()
108 .map(|acc| {
109 acc.as_stats_table()
110 .map(|table| {
111 table
112 .to_stats_set(&self.stats)
113 .vortex_expect("shouldn't fail to convert table we just created")
114 })
115 .unwrap_or_default()
116 })
117 .collect()
118 }
119}