vortex_layout/layouts/zoned/
writer.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::StreamExt as _;
8use parking_lot::Mutex;
9use vortex_array::ArrayContext;
10use vortex_array::stats::{PRUNING_STATS, Stat};
11use vortex_error::VortexResult;
12use vortex_io::runtime::Handle;
13
14use crate::layouts::zoned::ZonedLayout;
15use crate::layouts::zoned::zone_map::StatsAccumulator;
16use crate::segments::SegmentSinkRef;
17use crate::sequence::{
18 SendableSequentialStream, SequencePointer, SequentialArrayStreamExt, SequentialStreamAdapter,
19 SequentialStreamExt,
20};
21use crate::{IntoLayout, LayoutRef, LayoutStrategy};
22
23pub struct ZonedLayoutOptions {
24 pub block_size: usize,
26 pub stats: Arc<[Stat]>,
28 pub max_variable_length_statistics_size: usize,
30 pub concurrency: usize,
32}
33
34impl Default for ZonedLayoutOptions {
35 fn default() -> Self {
36 Self {
37 block_size: 8192,
38 stats: PRUNING_STATS.into(),
39 max_variable_length_statistics_size: 64,
40 concurrency: std::thread::available_parallelism()
41 .map(|v| v.get())
42 .unwrap_or(1),
43 }
44 }
45}
46
47pub struct ZonedStrategy {
48 child: Arc<dyn LayoutStrategy>,
49 stats: Arc<dyn LayoutStrategy>,
50 options: ZonedLayoutOptions,
51}
52
53impl ZonedStrategy {
54 pub fn new<Child: LayoutStrategy, Stats: LayoutStrategy>(
55 child: Child,
56 stats: Stats,
57 options: ZonedLayoutOptions,
58 ) -> Self {
59 Self {
60 child: Arc::new(child),
61 stats: Arc::new(stats),
62 options,
63 }
64 }
65}
66
67#[async_trait]
68impl LayoutStrategy for ZonedStrategy {
69 async fn write_stream(
70 &self,
71 ctx: ArrayContext,
72 segment_sink: SegmentSinkRef,
73 stream: SendableSequentialStream,
74 mut eof: SequencePointer,
75 handle: Handle,
76 ) -> VortexResult<LayoutRef> {
77 let stats = self.options.stats.clone();
78 let handle2 = handle.clone();
79
80 let stats_accumulator = Arc::new(Mutex::new(StatsAccumulator::new(
81 stream.dtype(),
82 &stats,
83 self.options.max_variable_length_statistics_size,
84 )));
85
86 let stream = SequentialStreamAdapter::new(
88 stream.dtype().clone(),
89 stream
90 .map(move |chunk| {
91 let stats = stats.clone();
92 handle2.spawn_cpu(move || {
93 let (sequence_id, chunk) = chunk?;
94 chunk.statistics().compute_all(&stats)?;
95 VortexResult::Ok((sequence_id, chunk))
96 })
97 })
98 .buffered(self.options.concurrency),
99 )
100 .sendable();
101
102 let stats_accumulator2 = stats_accumulator.clone();
105 let stream = SequentialStreamAdapter::new(
106 stream.dtype().clone(),
107 stream.map(move |item| {
108 let (sequence_id, chunk) = item?;
109 stats_accumulator2
111 .lock()
112 .push_chunk_without_compute(&chunk)?;
113 Ok((sequence_id, chunk))
114 }),
115 )
116 .sendable();
117
118 let block_size = self.options.block_size;
119
120 let data_eof = eof.split_off();
122 let data_layout = self
123 .child
124 .write_stream(
125 ctx.clone(),
126 segment_sink.clone(),
127 stream,
128 data_eof,
129 handle.clone(),
130 )
131 .await?;
132
133 let Some(stats_table) = stats_accumulator.lock().as_stats_table() else {
134 return Ok(data_layout);
137 };
138
139 let stats_stream = stats_table
142 .array()
143 .to_array_stream()
144 .sequenced(eof.split_off());
145 let zones_layout = self
146 .stats
147 .write_stream(ctx, segment_sink.clone(), stats_stream, eof, handle)
148 .await?;
149
150 Ok(ZonedLayout::new(
151 data_layout,
152 zones_layout,
153 block_size,
154 stats_table.present_stats().clone(),
155 )
156 .into_layout())
157 }
158
159 fn buffered_bytes(&self) -> u64 {
160 self.child.buffered_bytes() + self.stats.buffered_bytes()
161 }
162}