1use std::collections::{BTreeSet, VecDeque};
2
3use bytes::Bytes;
4use itertools::Itertools;
5use vortex_error::VortexResult;
6use vortex_flatbuffers::footer;
7
8use crate::file::read::buffered::{BufferedLayoutReader, RangedLayoutReader};
9use crate::file::read::cache::RelativeLayoutCache;
10use crate::file::read::mask::RowMask;
11use crate::file::{
12 BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec, Scan,
13 CHUNKED_LAYOUT_ID,
14};
15#[derive(Default, Debug)]
16pub struct ChunkedLayoutSpec;
17
18impl LayoutSpec for ChunkedLayoutSpec {
19 fn id(&self) -> LayoutId {
20 CHUNKED_LAYOUT_ID
21 }
22
23 fn layout_reader(
24 &self,
25 fb_bytes: Bytes,
26 fb_loc: usize,
27 scan: Scan,
28 layout_builder: LayoutDeserializer,
29 message_cache: RelativeLayoutCache,
30 ) -> VortexResult<Box<dyn LayoutReader>> {
31 Ok(Box::new(ChunkedLayout::new(
32 fb_bytes,
33 fb_loc,
34 scan,
35 layout_builder,
36 message_cache,
37 )))
38 }
39}
40
41#[derive(Debug)]
46pub struct ChunkedLayout {
47 fb_bytes: Bytes,
48 fb_loc: usize,
49 scan: Scan,
50 layout_builder: LayoutDeserializer,
51 message_cache: RelativeLayoutCache,
52 chunk_reader: Option<BufferedLayoutReader>,
53}
54
55impl ChunkedLayout {
56 pub fn new(
57 fb_bytes: Bytes,
58 fb_loc: usize,
59 scan: Scan,
60 layout_builder: LayoutDeserializer,
61 message_cache: RelativeLayoutCache,
62 ) -> Self {
63 Self {
64 fb_bytes,
65 fb_loc,
66 scan,
67 layout_builder,
68 message_cache,
69 chunk_reader: None,
70 }
71 }
72
73 fn flatbuffer(&self) -> footer::Layout {
74 unsafe {
75 let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
76 footer::Layout::init_from_table(tab)
77 }
78 }
79
80 fn has_metadata(&self) -> bool {
81 self.flatbuffer()
82 .metadata()
83 .map(|b| b.bytes()[0] != 0)
84 .unwrap_or(false)
85 }
86
87 fn children(&self) -> impl Iterator<Item = (usize, footer::Layout)> {
88 self.flatbuffer()
89 .children()
90 .unwrap_or_default()
91 .iter()
92 .enumerate()
93 .skip(if self.has_metadata() { 1 } else { 0 })
94 }
95
96 fn child_ranges(&self) -> Vec<(usize, usize)> {
97 self.children()
98 .map(|(_, c)| c.row_count())
99 .scan(0u64, |acc, row_count| {
100 let current = *acc;
101 *acc += row_count;
102 Some((current as usize, *acc as usize))
103 })
104 .collect::<Vec<_>>()
105 }
106
107 fn child_layouts<C: Fn(LayoutPartId) -> RelativeLayoutCache>(
108 &self,
109 cache: C,
110 ) -> VortexResult<VecDeque<RangedLayoutReader>> {
111 self.children()
112 .zip_eq(self.child_ranges())
113 .map(|((i, c), (begin, end))| {
114 let layout = self.layout_builder.read_layout(
115 self.fb_bytes.clone(),
116 c._tab.loc(),
117 self.scan.clone(),
118 cache(i as u16),
119 )?;
120 Ok(((begin, end), layout))
121 })
122 .collect::<VortexResult<VecDeque<_>>>()
123 }
124}
125
126impl LayoutReader for ChunkedLayout {
127 fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
128 for ((begin, _), child) in self.child_layouts(|i| self.message_cache.unknown_dtype(i))? {
129 child.add_splits(row_offset + begin, splits)?
130 }
131 Ok(())
132 }
133
134 fn read_selection(&mut self, selector: &RowMask) -> VortexResult<Option<BatchRead>> {
135 if let Some(br) = &mut self.chunk_reader {
136 br.read_next(selector)
137 } else {
138 self.chunk_reader = Some(BufferedLayoutReader::new(self.child_layouts(|i| {
139 self.message_cache
140 .relative(i, self.message_cache.dtype().clone())
141 })?));
142 self.read_selection(selector)
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use std::collections::VecDeque;
150 use std::iter;
151 use std::sync::{Arc, RwLock};
152
153 use bytes::Bytes;
154 use croaring::Bitmap;
155 use flatbuffers::{root_unchecked, FlatBufferBuilder};
156 use futures_util::TryStreamExt;
157 use vortex_array::array::{ChunkedArray, PrimitiveArray};
158 use vortex_array::{ArrayDType, IntoArray, IntoArrayVariant};
159 use vortex_dtype::PType;
160 use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
161 use vortex_flatbuffers::{footer, WriteFlatBuffer};
162
163 use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
164 use crate::file::read::layouts::chunked::ChunkedLayout;
165 use crate::file::read::layouts::test_read::{
166 filter_read_layout, read_layout, read_layout_data,
167 };
168 use crate::file::read::mask::RowMask;
169 use crate::file::{write, LayoutDeserializer, LayoutMessageCache, RowFilter, Scan};
170 use crate::messages::writer::MessageWriter;
171 use crate::stream_writer::ByteRange;
172
173 async fn layout_and_bytes(
174 cache: Arc<RwLock<LayoutMessageCache>>,
175 scan: Scan,
176 ) -> (ChunkedLayout, ChunkedLayout, Bytes, usize) {
177 let mut writer = MessageWriter::new(Vec::new());
178 let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
179 let array_dtype = array.dtype().clone();
180 let chunked =
181 ChunkedArray::try_new(iter::repeat(array).take(5).collect(), array_dtype).unwrap();
182 let len = chunked.len();
183 let mut byte_offsets = vec![writer.tell()];
184 let mut row_offsets = vec![0];
185 let mut row_offset = 0;
186
187 let mut chunk_stream = chunked.array_stream();
188 while let Some(chunk) = chunk_stream.try_next().await.unwrap() {
189 row_offset += chunk.len() as u64;
190 row_offsets.push(row_offset);
191 writer.write_batch(chunk).await.unwrap();
192 byte_offsets.push(writer.tell());
193 }
194 let flat_layouts = byte_offsets
195 .iter()
196 .zip(byte_offsets.iter().skip(1))
197 .zip(
198 row_offsets
199 .iter()
200 .zip(row_offsets.iter().skip(1))
201 .map(|(begin, end)| end - begin),
202 )
203 .map(|((begin, end), len)| write::Layout::flat(ByteRange::new(*begin, *end), len))
204 .collect::<VecDeque<_>>();
205
206 row_offsets.truncate(row_offsets.len() - 1);
207
208 let written = writer.into_inner();
209
210 let mut fb = FlatBufferBuilder::new();
211 let chunked_layout = write::Layout::chunked(flat_layouts.into(), len as u64, false);
212 let flat_buf = chunked_layout.write_flatbuffer(&mut fb);
213 fb.finish_minimal(flat_buf);
214 let fb_bytes = Bytes::copy_from_slice(fb.finished_data());
215
216 let fb_loc = (unsafe { root_unchecked::<footer::Layout>(&fb_bytes) })
217 ._tab
218 .loc();
219
220 let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into()));
221 (
222 ChunkedLayout::new(
223 fb_bytes.clone(),
224 fb_loc,
225 scan,
226 LayoutDeserializer::default(),
227 RelativeLayoutCache::new(cache.clone(), dtype.clone()),
228 ),
229 ChunkedLayout::new(
230 fb_bytes,
231 fb_loc,
232 Scan::new(None),
233 LayoutDeserializer::default(),
234 RelativeLayoutCache::new(cache, dtype),
235 ),
236 Bytes::from(written),
237 len,
238 )
239 }
240
241 #[tokio::test]
242 #[cfg_attr(miri, ignore)]
243 async fn read_range() {
244 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
245 let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
246 cache.clone(),
247 Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
248 Arc::new(Identity),
249 Operator::Gt,
250 Arc::new(Literal::new(10.into())),
251 )))))),
252 )
253 .await;
254 let arr = filter_read_layout(
255 &mut filter_layout,
256 &mut projection_layout,
257 cache,
258 &buf,
259 length,
260 )
261 .pop_front();
262
263 assert!(arr.is_some());
264 let arr = arr.unwrap();
265 assert_eq!(
266 arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
267 &(11..100).collect::<Vec<_>>()
268 );
269 }
270
271 #[tokio::test]
272 #[cfg_attr(miri, ignore)]
273 async fn read_range_no_filter() {
274 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
275 let (_, mut projection_layout, buf, length) =
276 layout_and_bytes(cache.clone(), Scan::new(None)).await;
277 let arr = read_layout(&mut projection_layout, cache, &buf, length).pop_front();
278
279 assert!(arr.is_some());
280 let arr = arr.unwrap();
281 assert_eq!(
282 arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
283 (0..100).collect::<Vec<_>>()
284 );
285 }
286
287 #[tokio::test]
288 #[cfg_attr(miri, ignore)]
289 async fn read_no_range() {
290 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
291 let (_, mut projection_layout, buf, _) =
292 layout_and_bytes(cache.clone(), Scan::new(None)).await;
293 let arr = read_layout_data(
294 &mut projection_layout,
295 cache,
296 &buf,
297 &RowMask::try_new(Bitmap::from_range(0..500), 0, 500).unwrap(),
298 );
299
300 assert!(arr.is_some());
301 let arr = arr.unwrap();
302 assert_eq!(
303 arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
304 iter::repeat(0..100).take(5).flatten().collect::<Vec<_>>()
305 );
306 }
307
308 #[tokio::test]
309 #[cfg_attr(miri, ignore)]
310 async fn read_multiple_selectors() {
311 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
312 let (_, mut projection_layout, buf, _) =
313 layout_and_bytes(cache.clone(), Scan::new(None)).await;
314 let mut arr = [
315 RowMask::try_new(Bitmap::from_range(0..150), 0, 200).unwrap(),
316 RowMask::try_new(Bitmap::from_range(50..150), 200, 400).unwrap(),
317 RowMask::try_new(Bitmap::from_range(0..100), 400, 500).unwrap(),
318 ]
319 .into_iter()
320 .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, &s))
321 .collect::<VecDeque<_>>();
322
323 assert_eq!(arr.len(), 3);
324 assert_eq!(
325 arr.pop_front()
326 .unwrap()
327 .into_primitive()
328 .unwrap()
329 .maybe_null_slice::<i32>(),
330 &(0..100).chain(0..50).collect::<Vec<_>>()
331 );
332 assert_eq!(
333 arr.pop_front()
334 .unwrap()
335 .into_primitive()
336 .unwrap()
337 .maybe_null_slice::<i32>(),
338 &(50..100).chain(0..50).collect::<Vec<_>>()
339 );
340 assert_eq!(
341 arr.pop_front()
342 .unwrap()
343 .into_primitive()
344 .unwrap()
345 .maybe_null_slice::<i32>(),
346 &(0..100).collect::<Vec<_>>()
347 );
348 }
349}