1use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7 Array, ArrayRef, LargeBinaryArray, PrimitiveArray, StructArray, UInt64Array, cast::AsArray,
8 types::UInt64Type,
9};
10use arrow_buffer::{
11 BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{FutureExt, future::BoxFuture};
16
17use lance_core::{Error, Result, datatypes::BLOB_DESC_FIELDS};
18
19use crate::{
20 EncodingsIo,
21 buffer::LanceBuffer,
22 decoder::{
23 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24 ScheduledScanLine, SchedulerContext,
25 },
26 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27 format::pb::{Blob, ColumnEncoding, column_encoding},
28 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29 repdef::RepDefBuilder,
30};
31
32#[derive(Debug)]
44pub struct BlobFieldScheduler {
45 descriptions_scheduler: Arc<dyn FieldScheduler>,
46}
47
48impl BlobFieldScheduler {
49 pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
50 Self {
51 descriptions_scheduler,
52 }
53 }
54}
55
56#[derive(Debug)]
57struct BlobFieldSchedulingJob<'a> {
58 descriptions_job: Box<dyn SchedulingJob + 'a>,
59}
60
61impl SchedulingJob for BlobFieldSchedulingJob<'_> {
62 fn schedule_next(
63 &mut self,
64 context: &mut SchedulerContext,
65 priority: &dyn PriorityRange,
66 ) -> Result<ScheduledScanLine> {
67 let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
68 let mut priority = priority.current_priority();
69 let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
70 let decoder = decoder.into_legacy();
71 let path = decoder.path;
72 let mut decoder = decoder.decoder;
73 let num_rows = decoder.num_rows();
74 let descriptions_fut = async move {
75 decoder
76 .wait_for_loaded(decoder.num_rows() - 1)
77 .await
78 .unwrap();
79 let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
80 descriptions_task.task.decode()
81 }
82 .boxed();
83 let decoder = Box::new(BlobFieldDecoder {
84 io: context.io().clone(),
85 unloaded_descriptions: Some(descriptions_fut),
86 positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
87 sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88 num_rows,
89 loaded: VecDeque::new(),
90 validity: VecDeque::new(),
91 rows_loaded: 0,
92 rows_drained: 0,
93 base_priority: priority,
94 });
95 priority += num_rows;
96 MessageType::DecoderReady(DecoderReady { decoder, path })
97 });
98 Ok(ScheduledScanLine {
99 decoders: decoders.collect(),
100 rows_scheduled: next_descriptions.rows_scheduled,
101 })
102 }
103
104 fn num_rows(&self) -> u64 {
105 self.descriptions_job.num_rows()
106 }
107}
108
109impl FieldScheduler for BlobFieldScheduler {
110 fn schedule_ranges<'a>(
111 &'a self,
112 ranges: &[std::ops::Range<u64>],
113 filter: &FilterExpression,
114 ) -> Result<Box<dyn SchedulingJob + 'a>> {
115 let descriptions_job = self
116 .descriptions_scheduler
117 .schedule_ranges(ranges, filter)?;
118 Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
119 }
120
121 fn num_rows(&self) -> u64 {
122 self.descriptions_scheduler.num_rows()
123 }
124
125 fn initialize<'a>(
126 &'a self,
127 filter: &'a FilterExpression,
128 context: &'a SchedulerContext,
129 ) -> BoxFuture<'a, Result<()>> {
130 self.descriptions_scheduler.initialize(filter, context)
131 }
132}
133
134pub struct BlobFieldDecoder {
135 io: Arc<dyn EncodingsIo>,
136 unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
137 positions: PrimitiveArray<UInt64Type>,
138 sizes: PrimitiveArray<UInt64Type>,
139 num_rows: u64,
140 loaded: VecDeque<Bytes>,
141 validity: VecDeque<BooleanBuffer>,
142 rows_loaded: u64,
143 rows_drained: u64,
144 base_priority: u64,
145}
146
147impl BlobFieldDecoder {
148 fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
149 let mut validity = BooleanBufferBuilder::new(num_values);
150 let mut remaining = num_values;
151 while remaining > 0 {
152 let next = self.validity.front_mut().unwrap();
153 if remaining < next.len() {
154 let slice = next.slice(0, remaining);
155 validity.append_buffer(&slice);
156 *next = next.slice(remaining, next.len() - remaining);
157 remaining = 0;
158 } else {
159 validity.append_buffer(next);
160 remaining -= next.len();
161 self.validity.pop_front();
162 }
163 }
164 let nulls = NullBuffer::new(validity.finish());
165 if nulls.null_count() == 0 {
166 Ok(None)
167 } else {
168 Ok(Some(nulls))
169 }
170 }
171}
172
173impl std::fmt::Debug for BlobFieldDecoder {
174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 f.debug_struct("BlobFieldDecoder")
176 .field("num_rows", &self.num_rows)
177 .field("rows_loaded", &self.rows_loaded)
178 .field("rows_drained", &self.rows_drained)
179 .finish()
180 }
181}
182
183impl LogicalPageDecoder for BlobFieldDecoder {
184 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
185 async move {
186 if self.unloaded_descriptions.is_some() {
187 let descriptions = self.unloaded_descriptions.take().unwrap().await?;
188 let descriptions = descriptions.as_struct();
189 self.positions = descriptions.column(0).as_primitive().clone();
190 self.sizes = descriptions.column(1).as_primitive().clone();
191 }
192 let start = self.rows_loaded as usize;
193 let end = (loaded_need + 1).min(self.num_rows) as usize;
194 let positions = self.positions.values().slice(start, end - start);
195 let sizes = self.sizes.values().slice(start, end - start);
196 let ranges = positions
197 .iter()
198 .zip(sizes.iter())
199 .map(|(position, size)| *position..(*position + *size))
200 .collect::<Vec<_>>();
201 let validity = positions
202 .iter()
203 .zip(sizes.iter())
204 .map(|(p, s)| *p != 1 || *s != 0)
205 .collect::<BooleanBuffer>();
206 self.validity.push_back(validity);
207 self.rows_loaded = end as u64;
208 let bytes = self
209 .io
210 .submit_request(ranges, self.base_priority + start as u64)
211 .await?;
212 self.loaded.extend(bytes);
213 Ok(())
214 }
215 .boxed()
216 }
217
218 fn rows_loaded(&self) -> u64 {
219 self.rows_loaded
220 }
221
222 fn num_rows(&self) -> u64 {
223 self.num_rows
224 }
225
226 fn rows_drained(&self) -> u64 {
227 self.rows_drained
228 }
229
230 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
231 let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
232 let validity = self.drain_validity(num_rows as usize)?;
233 self.rows_drained += num_rows;
234 Ok(NextDecodeTask {
235 num_rows,
236 task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
237 })
238 }
239
240 fn data_type(&self) -> &DataType {
241 &DataType::LargeBinary
242 }
243}
244
245struct BlobArrayDecodeTask {
246 bytes: Vec<Bytes>,
247 validity: Option<NullBuffer>,
248}
249
250impl BlobArrayDecodeTask {
251 fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
252 Self { bytes, validity }
253 }
254}
255
256impl DecodeArrayTask for BlobArrayDecodeTask {
257 fn decode(self: Box<Self>) -> Result<ArrayRef> {
258 let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
259 let offsets = self
260 .bytes
261 .iter()
262 .scan(0, |state, b| {
263 let start = *state;
264 *state += b.len();
265 Some(start as i64)
266 })
267 .chain(std::iter::once(num_bytes as i64))
268 .collect::<Vec<_>>();
269 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
270 let mut buffer = Vec::with_capacity(num_bytes);
271 for bytes in self.bytes {
272 buffer.extend_from_slice(&bytes);
273 }
274 let data_buf = Buffer::from_vec(buffer);
275 Ok(Arc::new(LargeBinaryArray::new(
276 offsets,
277 data_buf,
278 self.validity,
279 )))
280 }
281}
282
283pub struct BlobFieldEncoder {
284 description_encoder: Box<dyn FieldEncoder>,
285}
286
287impl BlobFieldEncoder {
288 pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
289 Self {
290 description_encoder,
291 }
292 }
293
294 fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
295 let binarray = array.as_binary_opt::<i64>().ok_or_else(|| {
296 Error::invalid_input_source(
297 format!("Expected large_binary and received {}", array.data_type()).into(),
298 )
299 })?;
300 let mut positions = Vec::with_capacity(array.len());
301 let mut sizes = Vec::with_capacity(array.len());
302 let data = binarray.values();
303 let nulls = binarray
304 .nulls()
305 .cloned()
306 .unwrap_or(NullBuffer::new_valid(binarray.len()));
307 for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
308 if is_valid {
309 let start = w[0] as u64;
310 let end = w[1] as u64;
311 let size = end - start;
312 if size > 0 {
313 let val = data.slice_with_length(start as usize, size as usize);
314 let position = external_buffers.add_buffer(LanceBuffer::from(val));
315 positions.push(position);
316 sizes.push(size);
317 } else {
318 positions.push(0);
320 sizes.push(0);
321 }
322 } else {
323 positions.push(1);
325 sizes.push(0);
326 }
327 }
328 let positions = Arc::new(UInt64Array::from(positions));
329 let sizes = Arc::new(UInt64Array::from(sizes));
330 let descriptions = Arc::new(StructArray::new(
331 BLOB_DESC_FIELDS.clone(),
332 vec![positions, sizes],
333 None,
334 ));
335 Ok(descriptions)
336 }
337}
338
339impl FieldEncoder for BlobFieldEncoder {
340 fn maybe_encode(
341 &mut self,
342 array: ArrayRef,
343 external_buffers: &mut OutOfLineBuffers,
344 repdef: RepDefBuilder,
345 row_number: u64,
346 num_rows: u64,
347 ) -> Result<Vec<EncodeTask>> {
348 let descriptions = Self::write_bins(array, external_buffers)?;
349 self.description_encoder.maybe_encode(
350 descriptions,
351 external_buffers,
352 repdef,
353 row_number,
354 num_rows,
355 )
356 }
357
358 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
360 self.description_encoder.flush(external_buffers)
361 }
362
363 fn num_columns(&self) -> u32 {
364 self.description_encoder.num_columns()
365 }
366
367 fn finish(
368 &mut self,
369 external_buffers: &mut OutOfLineBuffers,
370 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
371 let inner_finished = self.description_encoder.finish(external_buffers);
372 async move {
373 let mut cols = inner_finished.await?;
374 assert_eq!(cols.len(), 1);
375 let encoding = std::mem::take(&mut cols[0].encoding);
376 let wrapped_encoding = ColumnEncoding {
377 column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
378 inner: Some(Box::new(encoding)),
379 }))),
380 };
381 cols[0].encoding = wrapped_encoding;
382 Ok(cols)
383 }
384 .boxed()
385 }
386}
387
388#[cfg(test)]
389pub mod tests {
390 use std::{
391 collections::HashMap,
392 sync::{Arc, LazyLock},
393 };
394
395 use arrow_array::LargeBinaryArray;
396 use arrow_schema::{DataType, Field};
397 use lance_arrow::BLOB_META_KEY;
398
399 use crate::{
400 format::pb::column_encoding,
401 testing::{TestCases, check_round_trip_encoding_of_data, check_specific_random},
402 version::LanceFileVersion,
403 };
404
405 static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
406 [(BLOB_META_KEY.to_string(), "true".to_string())]
407 .iter()
408 .cloned()
409 .collect::<HashMap<_, _>>()
410 });
411
412 #[test_log::test(tokio::test)]
413 async fn test_basic_blob() {
414 let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
415 check_specific_random(
416 field,
417 TestCases::basic().with_max_file_version(LanceFileVersion::V2_1),
418 )
419 .await;
420 }
421
422 #[test_log::test(tokio::test)]
423 async fn test_simple_blob() {
424 let val1: &[u8] = &[1, 2, 3];
425 let val2: &[u8] = &[7, 8, 9];
426 let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
427 let test_cases = TestCases::default()
428 .with_max_file_version(LanceFileVersion::V2_1)
429 .with_expected_encoding("packed_struct")
430 .with_verify_encoding(Arc::new(|cols, version| {
431 if version < &LanceFileVersion::V2_1 {
432 assert_eq!(cols.len(), 1);
435 let col = &cols[0];
436 assert!(matches!(
437 col.encoding.column_encoding.as_ref().unwrap(),
438 column_encoding::ColumnEncoding::Blob(_)
439 ));
440 }
441 }));
442 check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
444 .await;
445
446 let test_cases = TestCases::default()
447 .with_min_file_version(LanceFileVersion::V2_1)
448 .with_verify_encoding(Arc::new(|cols, version| {
449 if version < &LanceFileVersion::V2_1 {
450 assert_eq!(cols.len(), 1);
451 let col = &cols[0];
452 assert!(!matches!(
453 col.encoding.column_encoding.as_ref().unwrap(),
454 column_encoding::ColumnEncoding::Blob(_)
455 ));
456 }
457 }));
458 check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
460 }
461}