vortex_serde/file/read/layouts/
chunked.rs

1use std::collections::{BTreeSet, VecDeque};
2
3use bytes::Bytes;
4use itertools::Itertools;
5use vortex_error::VortexResult;
6use vortex_flatbuffers::footer;
7
8use crate::file::read::buffered::{BufferedLayoutReader, RangedLayoutReader};
9use crate::file::read::cache::RelativeLayoutCache;
10use crate::file::read::mask::RowMask;
11use crate::file::{
12    BatchRead, LayoutDeserializer, LayoutId, LayoutPartId, LayoutReader, LayoutSpec, Scan,
13    CHUNKED_LAYOUT_ID,
14};
15#[derive(Default, Debug)]
16pub struct ChunkedLayoutSpec;
17
18impl LayoutSpec for ChunkedLayoutSpec {
19    fn id(&self) -> LayoutId {
20        CHUNKED_LAYOUT_ID
21    }
22
23    fn layout_reader(
24        &self,
25        fb_bytes: Bytes,
26        fb_loc: usize,
27        scan: Scan,
28        layout_builder: LayoutDeserializer,
29        message_cache: RelativeLayoutCache,
30    ) -> VortexResult<Box<dyn LayoutReader>> {
31        Ok(Box::new(ChunkedLayout::new(
32            fb_bytes,
33            fb_loc,
34            scan,
35            layout_builder,
36            message_cache,
37        )))
38    }
39}
40
41/// In memory representation of Chunked NestedLayout.
42///
43/// First child in the list is the metadata table
44/// Subsequent children are consecutive chunks of this layout
45#[derive(Debug)]
46pub struct ChunkedLayout {
47    fb_bytes: Bytes,
48    fb_loc: usize,
49    scan: Scan,
50    layout_builder: LayoutDeserializer,
51    message_cache: RelativeLayoutCache,
52    chunk_reader: Option<BufferedLayoutReader>,
53}
54
55impl ChunkedLayout {
56    pub fn new(
57        fb_bytes: Bytes,
58        fb_loc: usize,
59        scan: Scan,
60        layout_builder: LayoutDeserializer,
61        message_cache: RelativeLayoutCache,
62    ) -> Self {
63        Self {
64            fb_bytes,
65            fb_loc,
66            scan,
67            layout_builder,
68            message_cache,
69            chunk_reader: None,
70        }
71    }
72
73    fn flatbuffer(&self) -> footer::Layout {
74        unsafe {
75            let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
76            footer::Layout::init_from_table(tab)
77        }
78    }
79
80    fn has_metadata(&self) -> bool {
81        self.flatbuffer()
82            .metadata()
83            .map(|b| b.bytes()[0] != 0)
84            .unwrap_or(false)
85    }
86
87    fn children(&self) -> impl Iterator<Item = (usize, footer::Layout)> {
88        self.flatbuffer()
89            .children()
90            .unwrap_or_default()
91            .iter()
92            .enumerate()
93            .skip(if self.has_metadata() { 1 } else { 0 })
94    }
95
96    fn child_ranges(&self) -> Vec<(usize, usize)> {
97        self.children()
98            .map(|(_, c)| c.row_count())
99            .scan(0u64, |acc, row_count| {
100                let current = *acc;
101                *acc += row_count;
102                Some((current as usize, *acc as usize))
103            })
104            .collect::<Vec<_>>()
105    }
106
107    fn child_layouts<C: Fn(LayoutPartId) -> RelativeLayoutCache>(
108        &self,
109        cache: C,
110    ) -> VortexResult<VecDeque<RangedLayoutReader>> {
111        self.children()
112            .zip_eq(self.child_ranges())
113            .map(|((i, c), (begin, end))| {
114                let layout = self.layout_builder.read_layout(
115                    self.fb_bytes.clone(),
116                    c._tab.loc(),
117                    self.scan.clone(),
118                    cache(i as u16),
119                )?;
120                Ok(((begin, end), layout))
121            })
122            .collect::<VortexResult<VecDeque<_>>>()
123    }
124}
125
126impl LayoutReader for ChunkedLayout {
127    fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
128        for ((begin, _), child) in self.child_layouts(|i| self.message_cache.unknown_dtype(i))? {
129            child.add_splits(row_offset + begin, splits)?
130        }
131        Ok(())
132    }
133
134    fn read_selection(&mut self, selector: &RowMask) -> VortexResult<Option<BatchRead>> {
135        if let Some(br) = &mut self.chunk_reader {
136            br.read_next(selector)
137        } else {
138            self.chunk_reader = Some(BufferedLayoutReader::new(self.child_layouts(|i| {
139                self.message_cache
140                    .relative(i, self.message_cache.dtype().clone())
141            })?));
142            self.read_selection(selector)
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use std::collections::VecDeque;
150    use std::iter;
151    use std::sync::{Arc, RwLock};
152
153    use bytes::Bytes;
154    use croaring::Bitmap;
155    use flatbuffers::{root_unchecked, FlatBufferBuilder};
156    use futures_util::TryStreamExt;
157    use vortex_array::array::{ChunkedArray, PrimitiveArray};
158    use vortex_array::{ArrayDType, IntoArray, IntoArrayVariant};
159    use vortex_dtype::PType;
160    use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
161    use vortex_flatbuffers::{footer, WriteFlatBuffer};
162
163    use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
164    use crate::file::read::layouts::chunked::ChunkedLayout;
165    use crate::file::read::layouts::test_read::{
166        filter_read_layout, read_layout, read_layout_data,
167    };
168    use crate::file::read::mask::RowMask;
169    use crate::file::{write, LayoutDeserializer, LayoutMessageCache, RowFilter, Scan};
170    use crate::messages::writer::MessageWriter;
171    use crate::stream_writer::ByteRange;
172
173    async fn layout_and_bytes(
174        cache: Arc<RwLock<LayoutMessageCache>>,
175        scan: Scan,
176    ) -> (ChunkedLayout, ChunkedLayout, Bytes, usize) {
177        let mut writer = MessageWriter::new(Vec::new());
178        let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
179        let array_dtype = array.dtype().clone();
180        let chunked =
181            ChunkedArray::try_new(iter::repeat(array).take(5).collect(), array_dtype).unwrap();
182        let len = chunked.len();
183        let mut byte_offsets = vec![writer.tell()];
184        let mut row_offsets = vec![0];
185        let mut row_offset = 0;
186
187        let mut chunk_stream = chunked.array_stream();
188        while let Some(chunk) = chunk_stream.try_next().await.unwrap() {
189            row_offset += chunk.len() as u64;
190            row_offsets.push(row_offset);
191            writer.write_batch(chunk).await.unwrap();
192            byte_offsets.push(writer.tell());
193        }
194        let flat_layouts = byte_offsets
195            .iter()
196            .zip(byte_offsets.iter().skip(1))
197            .zip(
198                row_offsets
199                    .iter()
200                    .zip(row_offsets.iter().skip(1))
201                    .map(|(begin, end)| end - begin),
202            )
203            .map(|((begin, end), len)| write::Layout::flat(ByteRange::new(*begin, *end), len))
204            .collect::<VecDeque<_>>();
205
206        row_offsets.truncate(row_offsets.len() - 1);
207
208        let written = writer.into_inner();
209
210        let mut fb = FlatBufferBuilder::new();
211        let chunked_layout = write::Layout::chunked(flat_layouts.into(), len as u64, false);
212        let flat_buf = chunked_layout.write_flatbuffer(&mut fb);
213        fb.finish_minimal(flat_buf);
214        let fb_bytes = Bytes::copy_from_slice(fb.finished_data());
215
216        let fb_loc = (unsafe { root_unchecked::<footer::Layout>(&fb_bytes) })
217            ._tab
218            .loc();
219
220        let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into()));
221        (
222            ChunkedLayout::new(
223                fb_bytes.clone(),
224                fb_loc,
225                scan,
226                LayoutDeserializer::default(),
227                RelativeLayoutCache::new(cache.clone(), dtype.clone()),
228            ),
229            ChunkedLayout::new(
230                fb_bytes,
231                fb_loc,
232                Scan::new(None),
233                LayoutDeserializer::default(),
234                RelativeLayoutCache::new(cache, dtype),
235            ),
236            Bytes::from(written),
237            len,
238        )
239    }
240
241    #[tokio::test]
242    #[cfg_attr(miri, ignore)]
243    async fn read_range() {
244        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
245        let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
246            cache.clone(),
247            Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
248                Arc::new(Identity),
249                Operator::Gt,
250                Arc::new(Literal::new(10.into())),
251            )))))),
252        )
253        .await;
254        let arr = filter_read_layout(
255            &mut filter_layout,
256            &mut projection_layout,
257            cache,
258            &buf,
259            length,
260        )
261        .pop_front();
262
263        assert!(arr.is_some());
264        let arr = arr.unwrap();
265        assert_eq!(
266            arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
267            &(11..100).collect::<Vec<_>>()
268        );
269    }
270
271    #[tokio::test]
272    #[cfg_attr(miri, ignore)]
273    async fn read_range_no_filter() {
274        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
275        let (_, mut projection_layout, buf, length) =
276            layout_and_bytes(cache.clone(), Scan::new(None)).await;
277        let arr = read_layout(&mut projection_layout, cache, &buf, length).pop_front();
278
279        assert!(arr.is_some());
280        let arr = arr.unwrap();
281        assert_eq!(
282            arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
283            (0..100).collect::<Vec<_>>()
284        );
285    }
286
287    #[tokio::test]
288    #[cfg_attr(miri, ignore)]
289    async fn read_no_range() {
290        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
291        let (_, mut projection_layout, buf, _) =
292            layout_and_bytes(cache.clone(), Scan::new(None)).await;
293        let arr = read_layout_data(
294            &mut projection_layout,
295            cache,
296            &buf,
297            &RowMask::try_new(Bitmap::from_range(0..500), 0, 500).unwrap(),
298        );
299
300        assert!(arr.is_some());
301        let arr = arr.unwrap();
302        assert_eq!(
303            arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
304            iter::repeat(0..100).take(5).flatten().collect::<Vec<_>>()
305        );
306    }
307
308    #[tokio::test]
309    #[cfg_attr(miri, ignore)]
310    async fn read_multiple_selectors() {
311        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
312        let (_, mut projection_layout, buf, _) =
313            layout_and_bytes(cache.clone(), Scan::new(None)).await;
314        let mut arr = [
315            RowMask::try_new(Bitmap::from_range(0..150), 0, 200).unwrap(),
316            RowMask::try_new(Bitmap::from_range(50..150), 200, 400).unwrap(),
317            RowMask::try_new(Bitmap::from_range(0..100), 400, 500).unwrap(),
318        ]
319        .into_iter()
320        .flat_map(|s| read_layout_data(&mut projection_layout, cache.clone(), &buf, &s))
321        .collect::<VecDeque<_>>();
322
323        assert_eq!(arr.len(), 3);
324        assert_eq!(
325            arr.pop_front()
326                .unwrap()
327                .into_primitive()
328                .unwrap()
329                .maybe_null_slice::<i32>(),
330            &(0..100).chain(0..50).collect::<Vec<_>>()
331        );
332        assert_eq!(
333            arr.pop_front()
334                .unwrap()
335                .into_primitive()
336                .unwrap()
337                .maybe_null_slice::<i32>(),
338            &(50..100).chain(0..50).collect::<Vec<_>>()
339        );
340        assert_eq!(
341            arr.pop_front()
342                .unwrap()
343                .into_primitive()
344                .unwrap()
345                .maybe_null_slice::<i32>(),
346            &(0..100).collect::<Vec<_>>()
347        );
348    }
349}