vortex_serde/file/read/layouts/
flat.rs1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use vortex_array::{Array, Context};
6use vortex_error::{vortex_bail, VortexResult};
7use vortex_flatbuffers::footer;
8
9use crate::file::read::cache::RelativeLayoutCache;
10use crate::file::read::mask::RowMask;
11use crate::file::{
12 BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, Message, Scan,
13 FLAT_LAYOUT_ID,
14};
15use crate::messages::reader::ArrayMessageReader;
16use crate::stream_writer::ByteRange;
17
18#[derive(Debug)]
19pub struct FlatLayoutSpec;
20
21impl LayoutSpec for FlatLayoutSpec {
22 fn id(&self) -> LayoutId {
23 FLAT_LAYOUT_ID
24 }
25
26 fn layout_reader(
27 &self,
28 fb_bytes: Bytes,
29 fb_loc: usize,
30 scan: Scan,
31 layout_serde: LayoutDeserializer,
32 message_cache: RelativeLayoutCache,
33 ) -> VortexResult<Box<dyn LayoutReader>> {
34 let fb_layout = unsafe {
35 let tab = flatbuffers::Table::new(&fb_bytes, fb_loc);
36 footer::Layout::init_from_table(tab)
37 };
38 let buffers = fb_layout.buffers().unwrap_or_default();
39 if buffers.len() != 1 {
40 vortex_bail!("Flat layout can have exactly 1 buffer")
41 }
42 let buf = buffers.get(0);
43
44 Ok(Box::new(FlatLayout::new(
45 ByteRange::new(buf.begin(), buf.end()),
46 scan,
47 layout_serde.ctx(),
48 message_cache,
49 )))
50 }
51}
52
53#[derive(Debug)]
54pub struct FlatLayout {
55 range: ByteRange,
56 scan: Scan,
57 ctx: Arc<Context>,
58 message_cache: RelativeLayoutCache,
59}
60
61impl FlatLayout {
62 pub fn new(
63 range: ByteRange,
64 scan: Scan,
65 ctx: Arc<Context>,
66 message_cache: RelativeLayoutCache,
67 ) -> Self {
68 Self {
69 range,
70 scan,
71 ctx,
72 message_cache,
73 }
74 }
75
76 fn own_message(&self) -> Message {
77 (self.message_cache.absolute_id(&[]), self.range)
78 }
79
80 fn array_from_bytes(&self, mut buf: Bytes) -> VortexResult<Array> {
81 let mut array_reader = ArrayMessageReader::new();
82 let mut read_buf = Bytes::new();
83 while let Some(u) = array_reader.read(read_buf)? {
84 read_buf = buf.split_to(u);
85 }
86 array_reader.into_array(
87 self.ctx.clone(),
88 self.message_cache.dtype().value()?.clone(),
89 )
90 }
91}
92
93impl LayoutReader for FlatLayout {
94 fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
95 splits.insert(row_offset);
96 Ok(())
97 }
98
99 fn read_selection(&mut self, selection: &RowMask) -> VortexResult<Option<BatchRead>> {
100 if let Some(buf) = self.message_cache.get(&[]) {
101 let array = self.array_from_bytes(buf)?;
102 selection
103 .filter_array(array)?
104 .map(|s| {
105 Ok(BatchRead::Batch(
106 self.scan
107 .expr
108 .as_ref()
109 .map(|e| e.evaluate(&s))
110 .transpose()?
111 .unwrap_or(s),
112 ))
113 })
114 .transpose()
115 } else {
116 Ok(Some(BatchRead::ReadMore(vec![self.own_message()])))
117 }
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use std::sync::{Arc, RwLock};
124
125 use bytes::Bytes;
126 use vortex_array::array::PrimitiveArray;
127 use vortex_array::{Context, IntoArray, IntoArrayVariant};
128 use vortex_dtype::PType;
129 use vortex_expr::{BinaryExpr, Identity, Literal, Operator};
130
131 use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
132 use crate::file::read::layouts::flat::FlatLayout;
133 use crate::file::read::layouts::test_read::{filter_read_layout, read_layout};
134 use crate::file::{LayoutMessageCache, RowFilter, Scan};
135 use crate::messages::writer::MessageWriter;
136 use crate::stream_writer::ByteRange;
137
138 async fn read_only_layout(
139 cache: Arc<RwLock<LayoutMessageCache>>,
140 ) -> (FlatLayout, Bytes, usize, Arc<LazilyDeserializedDType>) {
141 let mut writer = MessageWriter::new(Vec::new());
142 let array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
143 let len = array.len();
144 writer.write_batch(array).await.unwrap();
145 let written = writer.into_inner();
146
147 let projection_scan = Scan::new(None);
148 let dtype = Arc::new(LazilyDeserializedDType::from_dtype(PType::I32.into()));
149
150 (
151 FlatLayout::new(
152 ByteRange::new(0, written.len() as u64),
153 projection_scan,
154 Arc::new(Context::default()),
155 RelativeLayoutCache::new(cache, dtype.clone()),
156 ),
157 Bytes::from(written),
158 len,
159 dtype,
160 )
161 }
162
163 async fn layout_and_bytes(
164 cache: Arc<RwLock<LayoutMessageCache>>,
165 scan: Scan,
166 ) -> (FlatLayout, FlatLayout, Bytes, usize) {
167 let (read_layout, bytes, len, dtype) = read_only_layout(cache.clone()).await;
168
169 (
170 FlatLayout::new(
171 ByteRange::new(0, bytes.len() as u64),
172 scan,
173 Arc::new(Context::default()),
174 RelativeLayoutCache::new(cache, dtype),
175 ),
176 read_layout,
177 bytes,
178 len,
179 )
180 }
181
182 #[tokio::test]
183 #[cfg_attr(miri, ignore)]
184 async fn read_range() {
185 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
186 let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
187 cache.clone(),
188 Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
189 Arc::new(Identity),
190 Operator::Gt,
191 Arc::new(Literal::new(10.into())),
192 )))))),
193 )
194 .await;
195 let arr = filter_read_layout(
196 &mut filter_layout,
197 &mut projection_layout,
198 cache,
199 &buf,
200 length,
201 )
202 .pop_front();
203
204 assert!(arr.is_some());
205 let arr = arr.unwrap();
206 assert_eq!(
207 arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
208 &(11..100).collect::<Vec<_>>()
209 );
210 }
211
212 #[tokio::test]
213 #[cfg_attr(miri, ignore)]
214 async fn read_range_no_filter() {
215 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
216 let (mut data_layout, buf, length, ..) = read_only_layout(cache.clone()).await;
217 let arr = read_layout(&mut data_layout, cache, &buf, length).pop_front();
218
219 assert!(arr.is_some());
220 let arr = arr.unwrap();
221 assert_eq!(
222 arr.into_primitive().unwrap().maybe_null_slice::<i32>(),
223 &(0..100).collect::<Vec<_>>()
224 );
225 }
226
227 #[tokio::test]
228 #[cfg_attr(miri, ignore)]
229 async fn read_empty() {
230 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
231 let (mut filter_layout, mut projection_layout, buf, length) = layout_and_bytes(
232 cache.clone(),
233 Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
234 Arc::new(Identity),
235 Operator::Gt,
236 Arc::new(Literal::new(101.into())),
237 )))))),
238 )
239 .await;
240 let arr = filter_read_layout(
241 &mut filter_layout,
242 &mut projection_layout,
243 cache,
244 &buf,
245 length,
246 )
247 .pop_front();
248
249 assert!(arr.is_none());
250 }
251}