vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_dtype::DType;
8use vortex_error::{VortexResult, vortex_bail};
9
10use crate::layouts::zoned::ZonedLayout;
11use crate::layouts::zoned::zone_map::StatsAccumulator;
12use crate::segments::SegmentWriter;
13use crate::writer::{LayoutWriter, LayoutWriterExt};
14use crate::{IntoLayout, LayoutRef, LayoutStrategy};
15
16pub struct ZonedLayoutOptions {
17 pub block_size: usize,
19 pub stats: Arc<[Stat]>,
21 pub max_variable_length_statistics_size: usize,
23}
24
25impl Default for ZonedLayoutOptions {
26 fn default() -> Self {
27 Self {
28 block_size: 8192,
29 stats: PRUNING_STATS.into(),
30 max_variable_length_statistics_size: 64,
31 }
32 }
33}
34
35pub struct ZonedLayoutWriter {
36 ctx: ArrayContext,
37 options: ZonedLayoutOptions,
38 data_writer: Box<dyn LayoutWriter>,
39 zone_map_strategy: ArcRef<dyn LayoutStrategy>,
40 stats_accumulator: StatsAccumulator,
41 dtype: DType,
42
43 nblocks: usize,
44 final_block: bool,
46}
47
48impl ZonedLayoutWriter {
49 pub fn new(
50 ctx: ArrayContext,
51 dtype: &DType,
52 child_writer: Box<dyn LayoutWriter>,
56 stats_strategy: ArcRef<dyn LayoutStrategy>,
57 options: ZonedLayoutOptions,
58 ) -> Self {
59 let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
60 let stats_accumulator = StatsAccumulator::new(
61 dtype,
62 &present_stats,
63 options.max_variable_length_statistics_size,
64 );
65
66 Self {
67 ctx,
68 options,
69 data_writer: child_writer,
70 zone_map_strategy: stats_strategy,
71 stats_accumulator,
72 dtype: dtype.clone(),
73 nblocks: 0,
74 final_block: false,
75 }
76 }
77}
78
79impl LayoutWriter for ZonedLayoutWriter {
80 fn push_chunk(
81 &mut self,
82 segment_writer: &mut dyn SegmentWriter,
83 chunk: ArrayRef,
84 ) -> VortexResult<()> {
85 assert_eq!(
86 chunk.dtype(),
87 &self.dtype,
88 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
89 chunk.dtype(),
90 self.dtype
91 );
92 if chunk.len() > self.options.block_size {
93 vortex_bail!(
94 "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
95 );
96 }
97 if self.final_block {
98 vortex_bail!(
99 "Cannot push chunks to StatsLayoutWriter after the final block has been written."
100 );
101 }
102 if chunk.len() < self.options.block_size {
103 self.final_block = true;
104 }
105
106 self.nblocks += 1;
107 self.stats_accumulator.push_chunk(&chunk)?;
108 self.data_writer.push_chunk(segment_writer, chunk)
109 }
110
111 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
112 self.data_writer.flush(segment_writer)
113 }
114
115 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
116 let data = self.data_writer.finish(segment_writer)?;
117
118 let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
120 return Ok(data);
123 };
124
125 let stats_array = stats_table.array();
128 let mut stats_writer = self
129 .zone_map_strategy
130 .new_writer(&self.ctx, stats_array.dtype())?;
131 let zones_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
132
133 Ok(ZonedLayout::new(
134 data,
135 zones_layout,
136 self.options.block_size,
137 stats_table.present_stats().clone(),
138 )
139 .into_layout())
140 }
141}