1use std::sync::Arc;
2
3use itertools::Itertools;
4use vortex_array::ArrayRef;
5use vortex_array::stats::{Stat, StatsSet};
6use vortex_dtype::DType;
7use vortex_error::{VortexExpect, VortexResult};
8
9use crate::layouts::stats::stats_table::StatsAccumulator;
10use crate::segments::SegmentWriter;
11use crate::{Layout, LayoutWriter};
12
13pub struct FileStatsLayoutWriter {
17 inner: Box<dyn LayoutWriter>,
18 stats: Arc<[Stat]>,
19 stats_accumulators: Vec<StatsAccumulator>,
20}
21
22impl FileStatsLayoutWriter {
23 pub fn new(
24 inner: Box<dyn LayoutWriter>,
25 dtype: &DType,
26 stats: Arc<[Stat]>,
27 ) -> VortexResult<Self> {
28 let stats_accumulators = match dtype.as_struct() {
29 Some(dtype) => dtype
30 .fields()
31 .map(|field_dtype| StatsAccumulator::new(field_dtype, &stats))
32 .collect(),
33 None => [StatsAccumulator::new(dtype.clone(), &stats)].into(),
34 };
35
36 Ok(Self {
37 inner,
38 stats,
39 stats_accumulators,
40 })
41 }
42
43 pub fn into_stats_sets(self) -> Vec<StatsSet> {
45 self.stats_accumulators
46 .into_iter()
47 .map(|mut acc| {
48 acc.as_stats_table()
49 .map(|table| {
50 table
51 .to_stats_set(&self.stats)
52 .vortex_expect("shouldn't fail to convert table we just created")
53 })
54 .unwrap_or_default()
55 })
56 .collect()
57 }
58}
59
60impl LayoutWriter for FileStatsLayoutWriter {
61 fn push_chunk(
62 &mut self,
63 segment_writer: &mut dyn SegmentWriter,
64 chunk: ArrayRef,
65 ) -> VortexResult<()> {
66 match chunk.as_struct_typed() {
67 None => {
68 self.stats_accumulators[0].push_chunk(&chunk)?;
69 }
70 Some(array) => {
71 for (acc, field) in self.stats_accumulators.iter_mut().zip_eq(array.fields()) {
72 acc.push_chunk(&field)?;
73 }
74 }
75 }
76 self.inner.push_chunk(segment_writer, chunk)
77 }
78
79 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
80 self.inner.flush(segment_writer)
81 }
82
83 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<Layout> {
84 self.inner.finish(segment_writer)
85 }
86}