1use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7 Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array, cast::AsArray,
8 types::UInt64Type,
9};
10use arrow_buffer::{
11 BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{FutureExt, future::BoxFuture};
16
17use lance_core::{Error, Result, datatypes::BLOB_DESC_FIELDS};
18
19use crate::{
20 EncodingsIo,
21 buffer::LanceBuffer,
22 decoder::{
23 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24 ScheduledScanLine, SchedulerContext,
25 },
26 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27 format::pb::{Blob, ColumnEncoding, column_encoding},
28 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29 repdef::RepDefBuilder,
30};
31
32#[derive(Debug)]
44pub struct BlobFieldScheduler {
45 descriptions_scheduler: Arc<dyn FieldScheduler>,
46}
47
48impl BlobFieldScheduler {
49 pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
50 Self {
51 descriptions_scheduler,
52 }
53 }
54}
55
56#[derive(Debug)]
57struct BlobFieldSchedulingJob<'a> {
58 descriptions_job: Box<dyn SchedulingJob + 'a>,
59}
60
61impl SchedulingJob for BlobFieldSchedulingJob<'_> {
62 fn schedule_next(
63 &mut self,
64 context: &mut SchedulerContext,
65 priority: &dyn PriorityRange,
66 ) -> Result<ScheduledScanLine> {
67 let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
68 let mut priority = priority.current_priority();
69 let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
70 let decoder = decoder.into_legacy();
71 let path = decoder.path;
72 let mut decoder = decoder.decoder;
73 let num_rows = decoder.num_rows();
74 let descriptions_fut = async move {
75 decoder
76 .wait_for_loaded(decoder.num_rows() - 1)
77 .await
78 .unwrap();
79 let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
80 descriptions_task.task.decode().map(|(arr, _)| arr)
81 }
82 .boxed();
83 let decoder = Box::new(BlobFieldDecoder {
84 io: context.io().clone(),
85 unloaded_descriptions: Some(descriptions_fut),
86 positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87 sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88 num_rows,
89 loaded: VecDeque::new(),
90 validity: VecDeque::new(),
91 rows_loaded: 0,
92 rows_drained: 0,
93 base_priority: priority,
94 });
95 priority += num_rows;
96 MessageType::DecoderReady(DecoderReady { decoder, path })
97 });
98 Ok(ScheduledScanLine {
99 decoders: decoders.collect(),
100 rows_scheduled: next_descriptions.rows_scheduled,
101 })
102 }
103
104 fn num_rows(&self) -> u64 {
105 self.descriptions_job.num_rows()
106 }
107}
108
109impl FieldScheduler for BlobFieldScheduler {
110 fn schedule_ranges<'a>(
111 &'a self,
112 ranges: &[std::ops::Range<u64>],
113 filter: &FilterExpression,
114 ) -> Result<Box<dyn SchedulingJob + 'a>> {
115 let descriptions_job = self
116 .descriptions_scheduler
117 .schedule_ranges(ranges, filter)?;
118 Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
119 }
120
121 fn num_rows(&self) -> u64 {
122 self.descriptions_scheduler.num_rows()
123 }
124
125 fn initialize<'a>(
126 &'a self,
127 filter: &'a FilterExpression,
128 context: &'a SchedulerContext,
129 ) -> BoxFuture<'a, Result<()>> {
130 self.descriptions_scheduler.initialize(filter, context)
131 }
132}
133
134pub struct BlobFieldDecoder {
135 io: Arc<dyn EncodingsIo>,
136 unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
137 positions: PrimitiveArray<UInt64Type>,
138 sizes: PrimitiveArray<UInt64Type>,
139 num_rows: u64,
140 loaded: VecDeque<Bytes>,
141 validity: VecDeque<BooleanBuffer>,
142 rows_loaded: u64,
143 rows_drained: u64,
144 base_priority: u64,
145}
146
147impl BlobFieldDecoder {
148 fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
149 let mut validity = BooleanBufferBuilder::new(num_values);
150 let mut remaining = num_values;
151 while remaining > 0 {
152 let next = self.validity.front_mut().unwrap();
153 if remaining < next.len() {
154 let slice = next.slice(0, remaining);
155 validity.append_buffer(&slice);
156 *next = next.slice(remaining, next.len() - remaining);
157 remaining = 0;
158 } else {
159 validity.append_buffer(next);
160 remaining -= next.len();
161 self.validity.pop_front();
162 }
163 }
164 let nulls = NullBuffer::new(validity.finish());
165 if nulls.null_count() == 0 {
166 Ok(None)
167 } else {
168 Ok(Some(nulls))
169 }
170 }
171}
172
173impl std::fmt::Debug for BlobFieldDecoder {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("BlobFieldDecoder")
176 .field("num_rows", &self.num_rows)
177 .field("rows_loaded", &self.rows_loaded)
178 .field("rows_drained", &self.rows_drained)
179 .finish()
180 }
181}
182
183impl LogicalPageDecoder for BlobFieldDecoder {
184 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
185 async move {
186 if self.unloaded_descriptions.is_some() {
187 let descriptions = self.unloaded_descriptions.take().unwrap().await?;
188 let descriptions = descriptions.as_struct();
189 self.positions = descriptions.column(0).as_primitive().clone();
190 self.sizes = descriptions.column(1).as_primitive().clone();
191 }
192 let start = self.rows_loaded as usize;
193 let end = (loaded_need + 1).min(self.num_rows) as usize;
194 let positions = self.positions.values().slice(start, end - start);
195 let sizes = self.sizes.values().slice(start, end - start);
196 let ranges = positions
197 .iter()
198 .zip(sizes.iter())
199 .map(|(position, size)| *position..(*position + *size))
200 .collect::<Vec<_>>();
201 let validity = positions
202 .iter()
203 .zip(sizes.iter())
204 .map(|(p, s)| *p != 1 || *s != 0)
205 .collect::<BooleanBuffer>();
206 let bytes = self
210 .io
211 .submit_request(ranges, self.base_priority + start as u64)
212 .await?;
213 self.validity.push_back(validity);
214 self.loaded.extend(bytes);
215 self.rows_loaded = end as u64;
216 Ok(())
217 }
218 .boxed()
219 }
220
221 fn rows_loaded(&self) -> u64 {
222 self.rows_loaded
223 }
224
225 fn num_rows(&self) -> u64 {
226 self.num_rows
227 }
228
229 fn rows_drained(&self) -> u64 {
230 self.rows_drained
231 }
232
233 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
234 if num_rows as usize > self.loaded.len() {
235 return Err(Error::internal(format!(
239 "BlobFieldDecoder was asked to drain {num_rows} rows but only \
240 {} are loaded",
241 self.loaded.len(),
242 )));
243 }
244 let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
245 let validity = self.drain_validity(num_rows as usize)?;
246 self.rows_drained += num_rows;
247 Ok(NextDecodeTask {
248 num_rows,
249 task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
250 })
251 }
252
253 fn data_type(&self) -> &DataType {
254 &DataType::LargeBinary
255 }
256}
257
258struct BlobArrayDecodeTask {
259 bytes: Vec<Bytes>,
260 validity: Option<NullBuffer>,
261}
262
263impl BlobArrayDecodeTask {
264 fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
265 Self { bytes, validity }
266 }
267}
268
269impl DecodeArrayTask for BlobArrayDecodeTask {
270 fn decode(self: Box<Self>) -> Result<(ArrayRef, u64)> {
271 let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
272 let offsets = self
273 .bytes
274 .iter()
275 .scan(0, |state, b| {
276 let start = *state;
277 *state += b.len();
278 Some(start as i64)
279 })
280 .chain(std::iter::once(num_bytes as i64))
281 .collect::<Vec<_>>();
282 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
283 let mut buffer = Vec::with_capacity(num_bytes);
284 for bytes in self.bytes {
285 buffer.extend_from_slice(&bytes);
286 }
287 let data_buf = Buffer::from_vec(buffer);
288 Ok((
291 Arc::new(LargeBinaryArray::new(offsets, data_buf, self.validity)),
292 0,
293 ))
294 }
295}
296
297pub struct BlobFieldEncoder {
298 description_encoder: Box<dyn FieldEncoder>,
299}
300
301impl BlobFieldEncoder {
302 pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
303 Self {
304 description_encoder,
305 }
306 }
307
308 fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
309 let binarray = array.as_binary_opt::<i64>().ok_or_else(|| {
310 Error::invalid_input_source(
311 format!("Expected large_binary and received {}", array.data_type()).into(),
312 )
313 })?;
314 let mut positions = Vec::with_capacity(array.len());
315 let mut sizes = Vec::with_capacity(array.len());
316 let data = binarray.values();
317 let nulls = binarray
318 .nulls()
319 .cloned()
320 .unwrap_or(NullBuffer::new_valid(binarray.len()));
321 for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
322 if is_valid {
323 let start = w[0] as u64;
324 let end = w[1] as u64;
325 let size = end - start;
326 if size > 0 {
327 let val = data.slice_with_length(start as usize, size as usize);
328 let position = external_buffers.add_buffer(LanceBuffer::from(val));
329 positions.push(position);
330 sizes.push(size);
331 } else {
332 positions.push(0);
334 sizes.push(0);
335 }
336 } else {
337 positions.push(1);
339 sizes.push(0);
340 }
341 }
342 let positions = Arc::new(UInt64Array::from(positions));
343 let sizes = Arc::new(UInt64Array::from(sizes));
344 let descriptions = Arc::new(StructArray::new(
345 BLOB_DESC_FIELDS.clone(),
346 vec![positions, sizes],
347 None,
348 ));
349 Ok(descriptions)
350 }
351}
352
353impl FieldEncoder for BlobFieldEncoder {
354 fn maybe_encode(
355 &mut self,
356 array: ArrayRef,
357 external_buffers: &mut OutOfLineBuffers,
358 repdef: RepDefBuilder,
359 row_number: u64,
360 num_rows: u64,
361 ) -> Result<Vec<EncodeTask>> {
362 let descriptions = Self::write_bins(array, external_buffers)?;
363 self.description_encoder.maybe_encode(
364 descriptions,
365 external_buffers,
366 repdef,
367 row_number,
368 num_rows,
369 )
370 }
371
372 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
374 self.description_encoder.flush(external_buffers)
375 }
376
377 fn num_columns(&self) -> u32 {
378 self.description_encoder.num_columns()
379 }
380
381 fn finish(
382 &mut self,
383 external_buffers: &mut OutOfLineBuffers,
384 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
385 let inner_finished = self.description_encoder.finish(external_buffers);
386 async move {
387 let mut cols = inner_finished.await?;
388 assert_eq!(cols.len(), 1);
389 let encoding = std::mem::take(&mut cols[0].encoding);
390 let wrapped_encoding = ColumnEncoding {
391 column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
392 inner: Some(Box::new(encoding)),
393 }))),
394 };
395 cols[0].encoding = wrapped_encoding;
396 Ok(cols)
397 }
398 .boxed()
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use std::{
405 collections::{HashMap, VecDeque},
406 ops::Range,
407 sync::{Arc, LazyLock},
408 };
409
410 use arrow_array::{LargeBinaryArray, PrimitiveArray, types::UInt64Type};
411 use arrow_schema::{DataType, Field};
412 use bytes::Bytes;
413 use futures::{FutureExt, future::BoxFuture};
414 use lance_arrow::BLOB_META_KEY;
415 use lance_core::{Error, Result};
416
417 use super::BlobFieldDecoder;
418 use crate::{
419 EncodingsIo,
420 format::pb::column_encoding,
421 previous::decoder::LogicalPageDecoder,
422 testing::{TestCases, check_round_trip_encoding_of_data, check_specific_random},
423 version::LanceFileVersion,
424 };
425
426 static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
427 [(BLOB_META_KEY.to_string(), "true".to_string())]
428 .iter()
429 .cloned()
430 .collect::<HashMap<_, _>>()
431 });
432
433 #[test_log::test(tokio::test)]
434 async fn test_basic_blob() {
435 let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
436 check_specific_random(
437 field,
438 TestCases::basic().with_max_file_version(LanceFileVersion::V2_1),
439 )
440 .await;
441 }
442
443 #[test_log::test(tokio::test)]
444 async fn test_simple_blob() {
445 let val1: &[u8] = &[1, 2, 3];
446 let val2: &[u8] = &[7, 8, 9];
447 let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
448 let test_cases = TestCases::default()
449 .with_max_file_version(LanceFileVersion::V2_1)
450 .with_expected_encoding("packed_struct")
451 .with_verify_encoding(Arc::new(|cols, version| {
452 if version < &LanceFileVersion::V2_1 {
453 assert_eq!(cols.len(), 1);
456 let col = &cols[0];
457 assert!(matches!(
458 col.encoding.column_encoding.as_ref().unwrap(),
459 column_encoding::ColumnEncoding::Blob(_)
460 ));
461 }
462 }));
463 check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
465 .await;
466
467 let test_cases = TestCases::default()
468 .with_min_file_version(LanceFileVersion::V2_1)
469 .with_verify_encoding(Arc::new(|cols, version| {
470 if version < &LanceFileVersion::V2_1 {
471 assert_eq!(cols.len(), 1);
472 let col = &cols[0];
473 assert!(!matches!(
474 col.encoding.column_encoding.as_ref().unwrap(),
475 column_encoding::ColumnEncoding::Blob(_)
476 ));
477 }
478 }));
479 check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
481 }
482
483 #[derive(Debug)]
486 struct FailingScheduler;
487
488 impl EncodingsIo for FailingScheduler {
489 fn submit_request(
490 &self,
491 _ranges: Vec<Range<u64>>,
492 _priority: u64,
493 ) -> BoxFuture<'static, Result<Vec<Bytes>>> {
494 std::future::ready(Err(Error::io("simulated HTTP 503 from cloud storage"))).boxed()
495 }
496 }
497
498 #[test_log::test(tokio::test)]
503 async fn test_io_failure_leaves_blob_decoder_consistent() {
504 let num_rows = 8u64;
505 let descs = PrimitiveArray::<UInt64Type>::from_iter_values(std::iter::repeat_n(
508 0u64,
509 num_rows as usize,
510 ));
511
512 let mut decoder = BlobFieldDecoder {
513 io: Arc::new(FailingScheduler),
514 unloaded_descriptions: None,
515 positions: descs.clone(),
516 sizes: descs,
517 num_rows,
518 loaded: VecDeque::new(),
519 validity: VecDeque::new(),
520 rows_loaded: 0,
521 rows_drained: 0,
522 base_priority: 0,
523 };
524
525 assert!(decoder.wait_for_loaded(num_rows - 1).await.is_err());
527
528 assert_eq!(decoder.rows_loaded, 0);
530 assert!(decoder.loaded.is_empty());
531 assert!(decoder.validity.is_empty());
532
533 assert!(decoder.drain(num_rows).is_err());
535 }
536}