vortex_serde/file/read/layouts/
flat.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use vortex_array::{Array, Context};
6use vortex_error::{vortex_bail, VortexResult};
7use vortex_flatbuffers::footer;
8
9use crate::file::read::cache::RelativeLayoutCache;
10use crate::file::read::mask::RowMask;
11use crate::file::{
12    BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, Scan,
13    FLAT_LAYOUT_ID,
14};
15use crate::messages::reader::ArrayMessageReader;
16use crate::stream_writer::ByteRange;
17
18#[derive(Debug)]
19pub struct FlatLayoutSpec;
20
21impl LayoutSpec for FlatLayoutSpec {
22    fn id(&self) -> LayoutId {
23        FLAT_LAYOUT_ID
24    }
25
26    fn layout_reader(
27        &self,
28        fb_bytes: Bytes,
29        fb_loc: usize,
30        scan: Scan,
31        layout_serde: LayoutDeserializer,
32        message_cache: RelativeLayoutCache,
33    ) -> VortexResult<Box<dyn LayoutReader>> {
34        let fb_layout = unsafe {
35            let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
36            footer::Layout::init_from_table(tab)
37        };
38        let buffers = fb_layout.buffers().unwrap_or_default();
39        if buffers.len() != 1 {
40            vortex_bail!("Flat layout can have exactly 1 buffer")
41        }
42        let buf = buffers.get(0);
43
44        Ok(Box::new(FlatLayout::new(
45            ByteRange::new(buf.begin(), buf.end()),
46            scan,
47            layout_serde.ctx(),
48            message_cache,
49        )))
50    }
51}
52
53#[derive(Debug)]
54pub struct FlatLayout {
55    range: ByteRange,
56    scan: Scan,
57    ctx: Arc<Context>,
58    message_cache: RelativeLayoutCache,
59}
60
61impl FlatLayout {
62    pub fn new(
63        range: ByteRange,
64        scan: Scan,
65        ctx: Arc<Context>,
66        message_cache: RelativeLayoutCache,
67    ) -> Self {
68        Self {
69            range,
70            scan,
71            ctx,
72            message_cache,
73        }
74    }
75
76    fn own_message(&self) -> Message {
77        (self.message_cache.absolute_id(&[]), self.range)
78    }
79
80    fn array_from_bytes(&self, mut buf: Bytes) -> VortexResult<Array> {
81        let mut array_reader = ArrayMessageReader::new();
82        let mut read_buf = Bytes::new();
83        while let Some(u) = array_reader.read(read_buf)? {
84            read_buf = buf.split_to(u);
85        }
86        array_reader.into_array(
87            self.ctx.clone(),
88            self.message_cache.dtype().value()?.clone(),
89        )
90    }
91}
92
93impl LayoutReader for FlatLayout {
94    fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
95        splits.insert(row_offset);
96        Ok(())
97    }
98
99    fn read_selection(&mut self, selection: &RowMask) -> VortexResult<Option<BatchRead>> {
100        if let Some(buf) = self.message_cache.get(&[]) {
101            let array = self.array_from_bytes(buf)?;
102            selection
103                .filter_array(array)?
104                .map(|s| {
105                    Ok(BatchRead::Batch(
106                        self.scan
107                            .expr
108                            .as_ref()
109                            .map(|e| e.evaluate(&s))
110                            .transpose()?
111                            .unwrap_or(s),
112                    ))
113                })
114                .transpose()
115        } else {
116            Ok(Some(BatchRead::ReadMore(vec![self.own_message()])))
117        }
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use std::sync::{Arc, RwLock};
124
125    use bytes::Bytes;
126    use vortex_array::array::PrimitiveArray;
127    use vortex_array::{Context, IntoArray, IntoArrayVariant};
128    use vortex_dtype::PType;
129    use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
130
131    use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
132    use crate::file::read::layouts::flat::FlatLayout;
133    use crate::file::read::layouts::test_read::{filter_read_layout, read_layout};
134    use crate::file::{LayoutMessageCache, RowFilter, Scan};
135    use crate::messages::writer::MessageWriter;
136    use crate::stream_writer::ByteRange;
137
138    async fn read_only_layout(
139        cache: Arc<RwLock<LayoutMessageCache>>,
140    ) -> (FlatLayout, Bytes, usize, Arc<LazilyDeserializedDType>) {
141        let mut writer = MessageWriter::new(Vec::new());
142        let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
143        let len = array.len();
144        writer.write_batch(array).await.unwrap();
145        let written = writer.into_inner();
146
147        let projection_scan = Scan::new(None);
148        let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into()));
149
150        (
151            FlatLayout::new(
152                ByteRange::new(0, written.len() as u64),
153                projection_scan,
154                Arc::new(Context::default()),
155                RelativeLayoutCache::new(cache, dtype.clone()),
156            ),
157            Bytes::from(written),
158            len,
159            dtype,
160        )
161    }
162
163    async fn layout_and_bytes(
164        cache: Arc<RwLock<LayoutMessageCache>>,
165        scan: Scan,
166    ) -> (FlatLayout, FlatLayout, Bytes, usize) {
167        let (read_layout, bytes, len, dtype) = read_only_layout(cache.clone()).await;
168
169        (
170            FlatLayout::new(
171                ByteRange::new(0, bytes.len() as u64),
172                scan,
173                Arc::new(Context::default()),
174                RelativeLayoutCache::new(cache, dtype),
175            ),
176            read_layout,
177            bytes,
178            len,
179        )
180    }
181
182    #[tokio::test]
183    #[cfg_attr(miri, ignore)]
184    async fn read_range() {
185        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
186        let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
187            cache.clone(),
188            Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
189                Arc::new(Identity),
190                Operator::Gt,
191                Arc::new(Literal::new(10.into())),
192            )))))),
193        )
194        .await;
195        let arr = filter_read_layout(
196            &mut filter_layout,
197            &mut projection_layout,
198            cache,
199            &buf,
200            length,
201        )
202        .pop_front();
203
204        assert!(arr.is_some());
205        let arr = arr.unwrap();
206        assert_eq!(
207            arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
208            &(11..100).collect::<Vec<_>>()
209        );
210    }
211
212    #[tokio::test]
213    #[cfg_attr(miri, ignore)]
214    async fn read_range_no_filter() {
215        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
216        let (mut data_layout, buf, length, ..) = read_only_layout(cache.clone()).await;
217        let arr = read_layout(&mut data_layout, cache, &buf, length).pop_front();
218
219        assert!(arr.is_some());
220        let arr = arr.unwrap();
221        assert_eq!(
222            arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
223            &(0..100).collect::<Vec<_>>()
224        );
225    }
226
227    #[tokio::test]
228    #[cfg_attr(miri, ignore)]
229    async fn read_empty() {
230        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
231        let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
232            cache.clone(),
233            Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
234                Arc::new(Identity),
235                Operator::Gt,
236                Arc::new(Literal::new(101.into())),
237            )))))),
238        )
239        .await;
240        let arr = filter_read_layout(
241            &mut filter_layout,
242            &mut projection_layout,
243            cache,
244            &buf,
245            length,
246        )
247        .pop_front();
248
249        assert!(arr.is_none());
250    }
251}