lance_encoding/encodings/logical/primitive/
blob.rs1use std::{collections::VecDeque, ops::Range, sync::Arc};
10
11use arrow_array::{cast::AsArray, make_array, Array, UInt64Array};
12use bytes::Bytes;
13use futures::{future::BoxFuture, FutureExt};
14use snafu::location;
15
16use lance_core::{
17 cache::DeepSizeOf, datatypes::BLOB_DESC_TYPE, error::LanceOptionExt, Error, Result,
18};
19
20use crate::{
21 buffer::LanceBuffer,
22 data::{BlockInfo, DataBlock, VariableWidthBlock},
23 decoder::{DecodePageTask, DecodedPage, StructuralPageDecoder},
24 encodings::logical::primitive::{CachedPageData, PageLoadTask, StructuralPageScheduler},
25 repdef::{DefinitionInterpretation, RepDefUnraveler},
26 EncodingsIo,
27};
28
29pub const TARGET_SHARD_SIZE: u64 = 32 * 1024 * 1024;
35
36#[derive(Debug)]
37pub(super) struct BlobDescriptionPageScheduler {
38 inner_scheduler: Box<dyn StructuralPageScheduler>,
39 def_meaning: Arc<[DefinitionInterpretation]>,
40}
41
42impl BlobDescriptionPageScheduler {
43 pub fn new(
44 inner_scheduler: Box<dyn StructuralPageScheduler>,
45 def_meaning: Arc<[DefinitionInterpretation]>,
46 ) -> Self {
47 Self {
48 inner_scheduler,
49 def_meaning,
50 }
51 }
52
53 fn wrap_decoder_fut(
54 decoder_fut: BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>>,
55 def_meaning: Arc<[DefinitionInterpretation]>,
56 ) -> BoxFuture<'static, Result<Box<dyn StructuralPageDecoder>>> {
57 async move {
58 let decoder = decoder_fut.await?;
59 Ok(
60 Box::new(BlobDescriptionPageDecoder::new(decoder, def_meaning))
61 as Box<dyn StructuralPageDecoder>,
62 )
63 }
64 .boxed()
65 }
66}
67
68impl StructuralPageScheduler for BlobDescriptionPageScheduler {
69 fn initialize<'a>(
70 &'a mut self,
71 io: &Arc<dyn EncodingsIo>,
72 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
73 self.inner_scheduler.initialize(io)
74 }
75
76 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
77 self.inner_scheduler.load(data);
78 }
79
80 fn schedule_ranges(
81 &self,
82 ranges: &[Range<u64>],
83 io: &Arc<dyn EncodingsIo>,
84 ) -> Result<Vec<PageLoadTask>> {
85 let tasks = self.inner_scheduler.schedule_ranges(ranges, io)?;
86 Ok(tasks
87 .into_iter()
88 .map(|task| PageLoadTask {
89 decoder_fut: Self::wrap_decoder_fut(task.decoder_fut, self.def_meaning.clone()),
90 num_rows: task.num_rows,
91 })
92 .collect())
93 }
94}
95
96#[derive(Debug)]
97struct BlobDescriptionPageDecoder {
98 inner: Box<dyn StructuralPageDecoder>,
99 def_meaning: Arc<[DefinitionInterpretation]>,
100}
101
102impl BlobDescriptionPageDecoder {
103 fn new(
104 inner: Box<dyn StructuralPageDecoder>,
105 def_meaning: Arc<[DefinitionInterpretation]>,
106 ) -> Self {
107 Self { inner, def_meaning }
108 }
109}
110
111impl StructuralPageDecoder for BlobDescriptionPageDecoder {
112 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
113 Ok(Box::new(BlobDescriptionDecodePageTask::new(
114 self.inner.drain(num_rows)?,
115 self.def_meaning.clone(),
116 )))
117 }
118
119 fn num_rows(&self) -> u64 {
120 self.inner.num_rows()
121 }
122}
123
124#[derive(Debug)]
125struct BlobDescriptionDecodePageTask {
126 inner: Box<dyn DecodePageTask>,
127 def_meaning: Arc<[DefinitionInterpretation]>,
128}
129
130impl BlobDescriptionDecodePageTask {
131 fn new(inner: Box<dyn DecodePageTask>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
132 Self { inner, def_meaning }
133 }
134}
135
136impl DecodePageTask for BlobDescriptionDecodePageTask {
137 fn decode(self: Box<Self>) -> Result<DecodedPage> {
138 let decoded = self.inner.decode()?;
139 let num_values = decoded.data.num_values();
140
141 let DataBlock::Struct(descriptions) = &decoded.data else {
143 return Err(Error::Internal {
144 message: "Expected struct data block for descriptions".into(),
145 location: location!(),
146 });
147 };
148 let mut description_children = descriptions.children.iter();
149 let DataBlock::FixedWidth(positions) = description_children.next().expect_ok()? else {
150 return Err(Error::Internal {
151 message: "Expected fixed width data block for positions".into(),
152 location: location!(),
153 });
154 };
155 let DataBlock::FixedWidth(sizes) = description_children.next().expect_ok()? else {
156 return Err(Error::Internal {
157 message: "Expected fixed width data block for sizes".into(),
158 location: location!(),
159 });
160 };
161 let positions = positions.data.borrow_to_typed_slice::<u64>();
162 let sizes = sizes.data.borrow_to_typed_slice::<u64>();
163
164 let mut rep = Vec::with_capacity(num_values as usize);
165 let mut def = Vec::with_capacity(num_values as usize);
166
167 for (position, size) in positions.iter().copied().zip(sizes.iter().copied()) {
168 if size == 0 {
169 if position == 0 {
170 rep.push(0);
171 def.push(0);
172 } else {
173 let repval = (position & 0xFFFF) as u16;
174 let defval = ((position >> 16) & 0xFFFF) as u16;
175 rep.push(repval);
176 def.push(defval);
177 }
178 } else {
179 rep.push(0);
180 def.push(0);
181 }
182 }
183
184 let rep = if rep.iter().any(|r| *r != 0) {
185 Some(rep)
186 } else {
187 None
188 };
189 let def = if self.def_meaning.len() > 1
190 || self.def_meaning[0] != DefinitionInterpretation::AllValidItem
191 {
192 Some(def)
193 } else {
194 None
195 };
196
197 let repdef =
198 RepDefUnraveler::new(rep, def, self.def_meaning.clone(), positions.len() as u64);
199
200 Ok(DecodedPage {
201 data: decoded.data,
202 repdef,
203 })
204 }
205}
206
207struct BlobCacheableState {
208 positions: Arc<UInt64Array>,
209 sizes: Arc<UInt64Array>,
210 inner_state: Arc<dyn CachedPageData>,
211}
212
213impl DeepSizeOf for BlobCacheableState {
214 fn deep_size_of_children(&self, context: &mut lance_core::cache::Context) -> usize {
215 self.positions.get_array_memory_size()
216 + self.sizes.get_array_memory_size()
217 + self.inner_state.deep_size_of_children(context)
218 }
219}
220
221impl CachedPageData for BlobCacheableState {
222 fn as_arc_any(self: Arc<Self>) -> Arc<dyn std::any::Any + Send + Sync + 'static> {
223 self
224 }
225}
226
227#[derive(Debug)]
228pub(super) struct BlobPageScheduler {
229 inner_scheduler: Box<dyn StructuralPageScheduler>,
230 row_number: u64,
231 num_rows: u64,
232 def_meaning: Arc<[DefinitionInterpretation]>,
233 positions: Option<Arc<UInt64Array>>,
234 sizes: Option<Arc<UInt64Array>>,
235}
236
237impl BlobPageScheduler {
238 pub fn new(
239 inner_scheduler: Box<dyn StructuralPageScheduler>,
240 row_number: u64,
241 num_rows: u64,
242 def_meaning: Arc<[DefinitionInterpretation]>,
243 ) -> Self {
244 Self {
245 inner_scheduler,
246 row_number,
247 num_rows,
248 def_meaning,
249 positions: None,
250 sizes: None,
251 }
252 }
253
254 fn create_page_load_task(
255 ranges_to_read: Vec<Range<u64>>,
256 mut loaded_blobs: Vec<LoadedBlob>,
257 first_row_number: u64,
258 io: &dyn EncodingsIo,
259 def_meaning: Arc<[DefinitionInterpretation]>,
260 ) -> Result<PageLoadTask> {
261 let num_rows = loaded_blobs.len() as u64;
262 let read_fut = io.submit_request(ranges_to_read, first_row_number);
263 let decoder_fut = async move {
264 let bytes = read_fut.await?;
265 let mut bytes_iter = bytes.into_iter();
266 for blob in loaded_blobs.iter_mut() {
267 if blob.def == 0 {
268 blob.set_bytes(bytes_iter.next().expect_ok()?);
269 }
270 }
271 debug_assert!(bytes_iter.next().is_none());
272 Ok(Box::new(BlobPageDecoder::new(loaded_blobs, def_meaning))
273 as Box<dyn StructuralPageDecoder>)
274 }
275 .boxed();
276 Ok(PageLoadTask {
277 decoder_fut,
278 num_rows,
279 })
280 }
281}
282
283impl StructuralPageScheduler for BlobPageScheduler {
284 fn initialize<'a>(
285 &'a mut self,
286 io: &Arc<dyn EncodingsIo>,
287 ) -> BoxFuture<'a, Result<Arc<dyn CachedPageData>>> {
288 let io = io.clone();
289 let num_rows = self.num_rows;
290 async move {
291 let cached = self.inner_scheduler.initialize(&io).await?;
292 let mut desc_decoders = self.inner_scheduler.schedule_ranges(&[0..num_rows], &io)?;
293 if desc_decoders.len() != 1 {
294 return Err(Error::NotSupported {
298 source: "Expected exactly one descriptor decoder".into(),
299 location: location!(),
300 });
301 }
302 let desc_decoder_task = desc_decoders.pop().unwrap();
303 let mut desc_decoder = desc_decoder_task.decoder_fut.await?;
304
305 let descs = desc_decoder.drain(desc_decoder_task.num_rows)?;
306 let descs = descs.decode()?;
307 let descs = make_array(descs.data.into_arrow(BLOB_DESC_TYPE.clone(), true)?);
308 let descs = descs.as_struct();
309 let positions = Arc::new(
310 descs
311 .column(0)
312 .as_any()
313 .downcast_ref::<UInt64Array>()
314 .unwrap()
315 .clone(),
316 );
317 let sizes = Arc::new(
318 descs
319 .column(1)
320 .as_any()
321 .downcast_ref::<UInt64Array>()
322 .unwrap()
323 .clone(),
324 );
325 self.positions = Some(positions.clone());
326 self.sizes = Some(sizes.clone());
327 let state = Arc::new(BlobCacheableState {
328 inner_state: cached,
329 positions,
330 sizes,
331 });
332 Ok(state as Arc<dyn CachedPageData>)
333 }
334 .boxed()
335 }
336
337 fn load(&mut self, data: &Arc<dyn CachedPageData>) {
338 let blob_state = data
339 .clone()
340 .as_arc_any()
341 .downcast::<BlobCacheableState>()
342 .unwrap();
343 self.positions = Some(blob_state.positions.clone());
344 self.sizes = Some(blob_state.sizes.clone());
345 self.inner_scheduler.load(&blob_state.inner_state);
346 }
347
348 fn schedule_ranges(
349 &self,
350 ranges: &[Range<u64>],
351 io: &Arc<dyn EncodingsIo>,
352 ) -> Result<Vec<PageLoadTask>> {
353 let num_rows: u64 = ranges.iter().map(|r| r.end - r.start).sum();
354
355 let positions = self.positions.as_ref().expect_ok()?;
356 let sizes = self.sizes.as_ref().expect_ok()?;
357
358 let mut page_load_tasks = Vec::new();
359 let mut bytes_so_far = 0;
360 let mut ranges_to_read = Vec::with_capacity(num_rows as usize);
361 let mut loaded_blobs = Vec::with_capacity(num_rows as usize);
362 let mut first_row_number = None;
363 for range in ranges {
364 for row in range.start..range.end {
365 if first_row_number.is_none() {
366 first_row_number = Some(row + self.row_number);
367 }
368 let position = positions.value(row as usize);
369 let size = sizes.value(row as usize);
370
371 if size == 0 {
372 let rep = (position & 0xFFFF) as u16;
373 let def = ((position >> 16) & 0xFFFF) as u16;
374 loaded_blobs.push(LoadedBlob::new(rep, def));
375 } else {
376 loaded_blobs.push(LoadedBlob::new(0, 0));
377 ranges_to_read.push(position..(position + size));
378 bytes_so_far += size;
379 }
380
381 if bytes_so_far >= TARGET_SHARD_SIZE {
382 let page_load_task = Self::create_page_load_task(
383 std::mem::take(&mut ranges_to_read),
384 std::mem::take(&mut loaded_blobs),
385 first_row_number.unwrap(),
386 io.as_ref(),
387 self.def_meaning.clone(),
388 )?;
389 page_load_tasks.push(page_load_task);
390 bytes_so_far = 0;
391 first_row_number = None;
392 }
393 }
394 }
395 if !loaded_blobs.is_empty() {
396 let page_load_task = Self::create_page_load_task(
397 std::mem::take(&mut ranges_to_read),
398 std::mem::take(&mut loaded_blobs),
399 first_row_number.unwrap(),
400 io.as_ref(),
401 self.def_meaning.clone(),
402 )?;
403 page_load_tasks.push(page_load_task);
404 }
405
406 Ok(page_load_tasks)
407 }
408}
409
410#[derive(Debug)]
411struct LoadedBlob {
412 bytes: Option<Bytes>,
413 rep: u16,
414 def: u16,
415}
416
417impl LoadedBlob {
418 fn new(rep: u16, def: u16) -> Self {
419 Self {
420 bytes: None,
421 rep,
422 def,
423 }
424 }
425
426 fn set_bytes(&mut self, bytes: Bytes) {
427 self.bytes = Some(bytes);
428 }
429}
430
431#[derive(Debug)]
432struct BlobPageDecoder {
433 blobs: VecDeque<LoadedBlob>,
434 def_meaning: Arc<[DefinitionInterpretation]>,
435 num_rows: u64,
436}
437
438impl BlobPageDecoder {
439 fn new(blobs: Vec<LoadedBlob>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
440 Self {
441 num_rows: blobs.len() as u64,
442 blobs: blobs.into_iter().collect(),
443 def_meaning,
444 }
445 }
446}
447
448impl StructuralPageDecoder for BlobPageDecoder {
449 fn drain(&mut self, num_rows: u64) -> Result<Box<dyn DecodePageTask>> {
450 let blobs = self.blobs.drain(0..num_rows as usize).collect::<Vec<_>>();
451 Ok(Box::new(BlobDecodePageTask::new(
452 blobs,
453 self.def_meaning.clone(),
454 )))
455 }
456
457 fn num_rows(&self) -> u64 {
458 self.num_rows
459 }
460}
461
462#[derive(Debug)]
463struct BlobDecodePageTask {
464 blobs: Vec<LoadedBlob>,
465 def_meaning: Arc<[DefinitionInterpretation]>,
466}
467
468impl BlobDecodePageTask {
469 fn new(blobs: Vec<LoadedBlob>, def_meaning: Arc<[DefinitionInterpretation]>) -> Self {
470 Self { blobs, def_meaning }
471 }
472}
473
474impl DecodePageTask for BlobDecodePageTask {
475 fn decode(self: Box<Self>) -> Result<DecodedPage> {
476 let num_values = self.blobs.len() as u64;
477 let num_bytes = self
478 .blobs
479 .iter()
480 .filter_map(|b| b.bytes.as_ref())
481 .map(|b| b.len())
482 .sum::<usize>();
483 let mut buffer = Vec::with_capacity(num_bytes);
484 let mut offsets = Vec::with_capacity(num_values as usize + 1);
485 let mut rep = Vec::with_capacity(num_values as usize);
486 let mut def = Vec::with_capacity(num_values as usize);
487 offsets.push(0_u64);
488 for blob in self.blobs {
489 rep.push(blob.rep);
490 def.push(blob.def);
491 if let Some(bytes) = blob.bytes {
492 offsets.push(offsets.last().unwrap() + bytes.len() as u64);
493 buffer.extend_from_slice(&bytes);
494 } else {
495 offsets.push(*offsets.last().unwrap());
497 }
498 }
499 let offsets = LanceBuffer::reinterpret_vec(offsets);
500 let data = LanceBuffer::from(buffer);
501 let data_block = DataBlock::VariableWidth(VariableWidthBlock {
502 data,
503 offsets,
504 bits_per_offset: 64,
505 num_values,
506 block_info: BlockInfo::new(),
507 });
508
509 let rep = if rep.iter().any(|r| *r != 0) {
510 Some(rep)
511 } else {
512 None
513 };
514 let def = if self.def_meaning.len() > 1
515 || self.def_meaning[0] != DefinitionInterpretation::AllValidItem
516 {
517 Some(def)
518 } else {
519 None
520 };
521
522 Ok(DecodedPage {
523 data: data_block,
524 repdef: RepDefUnraveler::new(rep, def, self.def_meaning, num_values),
525 })
526 }
527}