vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
2
3use arcref::ArcRef;
4use itertools::Itertools;
5use vortex_array::stats::{PRUNING_STATS, Stat};
6use vortex_array::{Array, ArrayContext, ArrayRef};
7use vortex_dtype::DType;
8use vortex_error::{VortexResult, vortex_bail};
9
10use crate::layouts::zoned::ZonedLayout;
11use crate::layouts::zoned::zone_map::StatsAccumulator;
12use crate::segments::SegmentWriter;
13use crate::writer::{LayoutWriter, LayoutWriterExt};
14use crate::{IntoLayout, LayoutRef, LayoutStrategy};
15
16pub struct ZonedLayoutOptions {
17 pub block_size: usize,
19 pub stats: Arc<[Stat]>,
21}
22
23impl Default for ZonedLayoutOptions {
24 fn default() -> Self {
25 Self {
26 block_size: 8192,
27 stats: PRUNING_STATS.into(),
28 }
29 }
30}
31
32pub struct ZonedLayoutWriter {
33 ctx: ArrayContext,
34 options: ZonedLayoutOptions,
35 data_writer: Box<dyn LayoutWriter>,
36 zone_map_strategy: ArcRef<dyn LayoutStrategy>,
37 stats_accumulator: StatsAccumulator,
38 dtype: DType,
39
40 nblocks: usize,
41 final_block: bool,
43}
44
45impl ZonedLayoutWriter {
46 pub fn new(
47 ctx: ArrayContext,
48 dtype: &DType,
49 child_writer: Box<dyn LayoutWriter>,
53 stats_strategy: ArcRef<dyn LayoutStrategy>,
54 options: ZonedLayoutOptions,
55 ) -> Self {
56 let present_stats: Arc<[Stat]> = options.stats.iter().sorted().copied().collect();
57 let stats_accumulator = StatsAccumulator::new(dtype.clone(), &present_stats);
58
59 Self {
60 ctx,
61 options,
62 data_writer: child_writer,
63 zone_map_strategy: stats_strategy,
64 stats_accumulator,
65 dtype: dtype.clone(),
66 nblocks: 0,
67 final_block: false,
68 }
69 }
70}
71
72impl LayoutWriter for ZonedLayoutWriter {
73 fn push_chunk(
74 &mut self,
75 segment_writer: &mut dyn SegmentWriter,
76 chunk: ArrayRef,
77 ) -> VortexResult<()> {
78 assert_eq!(
79 chunk.dtype(),
80 &self.dtype,
81 "Can't push chunks of the wrong dtype into a LayoutWriter. Pushed {} but expected {}.",
82 chunk.dtype(),
83 self.dtype
84 );
85 if chunk.len() > self.options.block_size {
86 vortex_bail!(
87 "Chunks passed to StatsLayoutWriter must be block_size in length, except the final block. Use RepartitionWriter to split chunks into blocks."
88 );
89 }
90 if self.final_block {
91 vortex_bail!(
92 "Cannot push chunks to StatsLayoutWriter after the final block has been written."
93 );
94 }
95 if chunk.len() < self.options.block_size {
96 self.final_block = true;
97 }
98
99 self.nblocks += 1;
100 self.stats_accumulator.push_chunk(&chunk)?;
101 self.data_writer.push_chunk(segment_writer, chunk)
102 }
103
104 fn flush(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<()> {
105 self.data_writer.flush(segment_writer)
106 }
107
108 fn finish(&mut self, segment_writer: &mut dyn SegmentWriter) -> VortexResult<LayoutRef> {
109 let data = self.data_writer.finish(segment_writer)?;
110
111 let Some(stats_table) = self.stats_accumulator.as_stats_table() else {
113 return Ok(data);
116 };
117
118 let stats_array = stats_table.array();
121 let mut stats_writer = self
122 .zone_map_strategy
123 .new_writer(&self.ctx, stats_array.dtype())?;
124 let zones_layout = stats_writer.push_one(segment_writer, stats_table.array().to_array())?;
125
126 Ok(ZonedLayout::new(
127 data,
128 zones_layout,
129 self.options.block_size,
130 stats_table.present_stats().clone(),
131 )
132 .into_layout())
133 }
134}