1use std::{collections::VecDeque, sync::Arc, vec};
5
6use arrow_array::{
7 cast::AsArray, types::UInt64Type, Array, ArrayRef, LargeBinaryArray, PrimitiveArray,
8 StructArray, UInt64Array,
9};
10use arrow_buffer::{
11 BooleanBuffer, BooleanBufferBuilder, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer,
12};
13use arrow_schema::DataType;
14use bytes::Bytes;
15use futures::{future::BoxFuture, FutureExt};
16use snafu::location;
17
18use lance_core::{datatypes::BLOB_DESC_FIELDS, Error, Result};
19
20use crate::{
21 buffer::LanceBuffer,
22 decoder::{
23 DecodeArrayTask, FilterExpression, MessageType, NextDecodeTask, PriorityRange,
24 ScheduledScanLine, SchedulerContext,
25 },
26 encoder::{EncodeTask, FieldEncoder, OutOfLineBuffers},
27 format::pb::{column_encoding, Blob, ColumnEncoding},
28 previous::decoder::{DecoderReady, FieldScheduler, LogicalPageDecoder, SchedulingJob},
29 repdef::RepDefBuilder,
30 EncodingsIo,
31};
32
33#[derive(Debug)]
45pub struct BlobFieldScheduler {
46 descriptions_scheduler: Arc<dyn FieldScheduler>,
47}
48
49impl BlobFieldScheduler {
50 pub fn new(descriptions_scheduler: Arc<dyn FieldScheduler>) -> Self {
51 Self {
52 descriptions_scheduler,
53 }
54 }
55}
56
57#[derive(Debug)]
58struct BlobFieldSchedulingJob<'a> {
59 descriptions_job: Box<dyn SchedulingJob + 'a>,
60}
61
62impl SchedulingJob for BlobFieldSchedulingJob<'_> {
63 fn schedule_next(
64 &mut self,
65 context: &mut SchedulerContext,
66 priority: &dyn PriorityRange,
67 ) -> Result<ScheduledScanLine> {
68 let next_descriptions = self.descriptions_job.schedule_next(context, priority)?;
69 let mut priority = priority.current_priority();
70 let decoders = next_descriptions.decoders.into_iter().map(|decoder| {
71 let decoder = decoder.into_legacy();
72 let path = decoder.path;
73 let mut decoder = decoder.decoder;
74 let num_rows = decoder.num_rows();
75 let descriptions_fut = async move {
76 decoder
77 .wait_for_loaded(decoder.num_rows() - 1)
78 .await
79 .unwrap();
80 let descriptions_task = decoder.drain(decoder.num_rows()).unwrap();
81 descriptions_task.task.decode()
82 }
83 .boxed();
84 let decoder = Box::new(BlobFieldDecoder {
85 io: context.io().clone(),
86 unloaded_descriptions: Some(descriptions_fut),
87 positions: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
88 sizes: PrimitiveArray::<UInt64Type>::from_iter_values(vec![]),
89 num_rows,
90 loaded: VecDeque::new(),
91 validity: VecDeque::new(),
92 rows_loaded: 0,
93 rows_drained: 0,
94 base_priority: priority,
95 });
96 priority += num_rows;
97 MessageType::DecoderReady(DecoderReady { decoder, path })
98 });
99 Ok(ScheduledScanLine {
100 decoders: decoders.collect(),
101 rows_scheduled: next_descriptions.rows_scheduled,
102 })
103 }
104
105 fn num_rows(&self) -> u64 {
106 self.descriptions_job.num_rows()
107 }
108}
109
110impl FieldScheduler for BlobFieldScheduler {
111 fn schedule_ranges<'a>(
112 &'a self,
113 ranges: &[std::ops::Range<u64>],
114 filter: &FilterExpression,
115 ) -> Result<Box<dyn SchedulingJob + 'a>> {
116 let descriptions_job = self
117 .descriptions_scheduler
118 .schedule_ranges(ranges, filter)?;
119 Ok(Box::new(BlobFieldSchedulingJob { descriptions_job }))
120 }
121
122 fn num_rows(&self) -> u64 {
123 self.descriptions_scheduler.num_rows()
124 }
125
126 fn initialize<'a>(
127 &'a self,
128 filter: &'a FilterExpression,
129 context: &'a SchedulerContext,
130 ) -> BoxFuture<'a, Result<()>> {
131 self.descriptions_scheduler.initialize(filter, context)
132 }
133}
134
135pub struct BlobFieldDecoder {
136 io: Arc<dyn EncodingsIo>,
137 unloaded_descriptions: Option<BoxFuture<'static, Result<ArrayRef>>>,
138 positions: PrimitiveArray<UInt64Type>,
139 sizes: PrimitiveArray<UInt64Type>,
140 num_rows: u64,
141 loaded: VecDeque<Bytes>,
142 validity: VecDeque<BooleanBuffer>,
143 rows_loaded: u64,
144 rows_drained: u64,
145 base_priority: u64,
146}
147
148impl BlobFieldDecoder {
149 fn drain_validity(&mut self, num_values: usize) -> Result<Option<NullBuffer>> {
150 let mut validity = BooleanBufferBuilder::new(num_values);
151 let mut remaining = num_values;
152 while remaining > 0 {
153 let next = self.validity.front_mut().unwrap();
154 if remaining < next.len() {
155 let slice = next.slice(0, remaining);
156 validity.append_buffer(&slice);
157 *next = next.slice(remaining, next.len() - remaining);
158 remaining = 0;
159 } else {
160 validity.append_buffer(next);
161 remaining -= next.len();
162 self.validity.pop_front();
163 }
164 }
165 let nulls = NullBuffer::new(validity.finish());
166 if nulls.null_count() == 0 {
167 Ok(None)
168 } else {
169 Ok(Some(nulls))
170 }
171 }
172}
173
174impl std::fmt::Debug for BlobFieldDecoder {
175 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
176 f.debug_struct("BlobFieldDecoder")
177 .field("num_rows", &self.num_rows)
178 .field("rows_loaded", &self.rows_loaded)
179 .field("rows_drained", &self.rows_drained)
180 .finish()
181 }
182}
183
184impl LogicalPageDecoder for BlobFieldDecoder {
185 fn wait_for_loaded(&mut self, loaded_need: u64) -> BoxFuture<'_, Result<()>> {
186 async move {
187 if self.unloaded_descriptions.is_some() {
188 let descriptions = self.unloaded_descriptions.take().unwrap().await?;
189 let descriptions = descriptions.as_struct();
190 self.positions = descriptions.column(0).as_primitive().clone();
191 self.sizes = descriptions.column(1).as_primitive().clone();
192 }
193 let start = self.rows_loaded as usize;
194 let end = (loaded_need + 1).min(self.num_rows) as usize;
195 let positions = self.positions.values().slice(start, end - start);
196 let sizes = self.sizes.values().slice(start, end - start);
197 let ranges = positions
198 .iter()
199 .zip(sizes.iter())
200 .map(|(position, size)| *position..(*position + *size))
201 .collect::<Vec<_>>();
202 let validity = positions
203 .iter()
204 .zip(sizes.iter())
205 .map(|(p, s)| *p != 1 || *s != 0)
206 .collect::<BooleanBuffer>();
207 self.validity.push_back(validity);
208 self.rows_loaded = end as u64;
209 let bytes = self
210 .io
211 .submit_request(ranges, self.base_priority + start as u64)
212 .await?;
213 self.loaded.extend(bytes);
214 Ok(())
215 }
216 .boxed()
217 }
218
219 fn rows_loaded(&self) -> u64 {
220 self.rows_loaded
221 }
222
223 fn num_rows(&self) -> u64 {
224 self.num_rows
225 }
226
227 fn rows_drained(&self) -> u64 {
228 self.rows_drained
229 }
230
231 fn drain(&mut self, num_rows: u64) -> Result<NextDecodeTask> {
232 let bytes = self.loaded.drain(0..num_rows as usize).collect::<Vec<_>>();
233 let validity = self.drain_validity(num_rows as usize)?;
234 self.rows_drained += num_rows;
235 Ok(NextDecodeTask {
236 num_rows,
237 task: Box::new(BlobArrayDecodeTask::new(bytes, validity)),
238 })
239 }
240
241 fn data_type(&self) -> &DataType {
242 &DataType::LargeBinary
243 }
244}
245
246struct BlobArrayDecodeTask {
247 bytes: Vec<Bytes>,
248 validity: Option<NullBuffer>,
249}
250
251impl BlobArrayDecodeTask {
252 fn new(bytes: Vec<Bytes>, validity: Option<NullBuffer>) -> Self {
253 Self { bytes, validity }
254 }
255}
256
257impl DecodeArrayTask for BlobArrayDecodeTask {
258 fn decode(self: Box<Self>) -> Result<ArrayRef> {
259 let num_bytes = self.bytes.iter().map(|b| b.len()).sum::<usize>();
260 let offsets = self
261 .bytes
262 .iter()
263 .scan(0, |state, b| {
264 let start = *state;
265 *state += b.len();
266 Some(start as i64)
267 })
268 .chain(std::iter::once(num_bytes as i64))
269 .collect::<Vec<_>>();
270 let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
271 let mut buffer = Vec::with_capacity(num_bytes);
272 for bytes in self.bytes {
273 buffer.extend_from_slice(&bytes);
274 }
275 let data_buf = Buffer::from_vec(buffer);
276 Ok(Arc::new(LargeBinaryArray::new(
277 offsets,
278 data_buf,
279 self.validity,
280 )))
281 }
282}
283
284pub struct BlobFieldEncoder {
285 description_encoder: Box<dyn FieldEncoder>,
286}
287
288impl BlobFieldEncoder {
289 pub fn new(description_encoder: Box<dyn FieldEncoder>) -> Self {
290 Self {
291 description_encoder,
292 }
293 }
294
295 fn write_bins(array: ArrayRef, external_buffers: &mut OutOfLineBuffers) -> Result<ArrayRef> {
296 let binarray = array
297 .as_binary_opt::<i64>()
298 .ok_or_else(|| Error::InvalidInput {
299 source: format!("Expected large_binary and received {}", array.data_type()).into(),
300 location: location!(),
301 })?;
302 let mut positions = Vec::with_capacity(array.len());
303 let mut sizes = Vec::with_capacity(array.len());
304 let data = binarray.values();
305 let nulls = binarray
306 .nulls()
307 .cloned()
308 .unwrap_or(NullBuffer::new_valid(binarray.len()));
309 for (w, is_valid) in binarray.value_offsets().windows(2).zip(nulls.into_iter()) {
310 if is_valid {
311 let start = w[0] as u64;
312 let end = w[1] as u64;
313 let size = end - start;
314 if size > 0 {
315 let val = data.slice_with_length(start as usize, size as usize);
316 let position = external_buffers.add_buffer(LanceBuffer::from(val));
317 positions.push(position);
318 sizes.push(size);
319 } else {
320 positions.push(0);
322 sizes.push(0);
323 }
324 } else {
325 positions.push(1);
327 sizes.push(0);
328 }
329 }
330 let positions = Arc::new(UInt64Array::from(positions));
331 let sizes = Arc::new(UInt64Array::from(sizes));
332 let descriptions = Arc::new(StructArray::new(
333 BLOB_DESC_FIELDS.clone(),
334 vec![positions, sizes],
335 None,
336 ));
337 Ok(descriptions)
338 }
339}
340
341impl FieldEncoder for BlobFieldEncoder {
342 fn maybe_encode(
343 &mut self,
344 array: ArrayRef,
345 external_buffers: &mut OutOfLineBuffers,
346 repdef: RepDefBuilder,
347 row_number: u64,
348 num_rows: u64,
349 ) -> Result<Vec<EncodeTask>> {
350 let descriptions = Self::write_bins(array, external_buffers)?;
351 self.description_encoder.maybe_encode(
352 descriptions,
353 external_buffers,
354 repdef,
355 row_number,
356 num_rows,
357 )
358 }
359
360 fn flush(&mut self, external_buffers: &mut OutOfLineBuffers) -> Result<Vec<EncodeTask>> {
362 self.description_encoder.flush(external_buffers)
363 }
364
365 fn num_columns(&self) -> u32 {
366 self.description_encoder.num_columns()
367 }
368
369 fn finish(
370 &mut self,
371 external_buffers: &mut OutOfLineBuffers,
372 ) -> BoxFuture<'_, Result<Vec<crate::encoder::EncodedColumn>>> {
373 let inner_finished = self.description_encoder.finish(external_buffers);
374 async move {
375 let mut cols = inner_finished.await?;
376 assert_eq!(cols.len(), 1);
377 let encoding = std::mem::take(&mut cols[0].encoding);
378 let wrapped_encoding = ColumnEncoding {
379 column_encoding: Some(column_encoding::ColumnEncoding::Blob(Box::new(Blob {
380 inner: Some(Box::new(encoding)),
381 }))),
382 };
383 cols[0].encoding = wrapped_encoding;
384 Ok(cols)
385 }
386 .boxed()
387 }
388}
389
390#[cfg(test)]
391pub mod tests {
392 use std::{
393 collections::HashMap,
394 sync::{Arc, LazyLock},
395 };
396
397 use arrow_array::LargeBinaryArray;
398 use arrow_schema::{DataType, Field};
399 use lance_core::datatypes::BLOB_META_KEY;
400
401 use crate::{
402 format::pb::column_encoding,
403 testing::{check_round_trip_encoding_of_data, check_round_trip_encoding_random, TestCases},
404 version::LanceFileVersion,
405 };
406
407 static BLOB_META: LazyLock<HashMap<String, String>> = LazyLock::new(|| {
408 [(BLOB_META_KEY.to_string(), "true".to_string())]
409 .iter()
410 .cloned()
411 .collect::<HashMap<_, _>>()
412 });
413
414 #[test_log::test(tokio::test)]
415 async fn test_blob() {
416 let field = Field::new("", DataType::LargeBinary, false).with_metadata(BLOB_META.clone());
417 check_round_trip_encoding_random(field, LanceFileVersion::V2_0).await;
418 }
419
420 #[test_log::test(tokio::test)]
421 async fn test_simple_blob() {
422 let val1: &[u8] = &[1, 2, 3];
423 let val2: &[u8] = &[7, 8, 9];
424 let array = Arc::new(LargeBinaryArray::from(vec![Some(val1), None, Some(val2)]));
425 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
426 assert_eq!(cols.len(), 1);
427 let col = &cols[0];
428 assert!(matches!(
429 col.encoding.column_encoding.as_ref().unwrap(),
430 column_encoding::ColumnEncoding::Blob(_)
431 ));
432 }));
433 check_round_trip_encoding_of_data(vec![array.clone()], &test_cases, BLOB_META.clone())
435 .await;
436
437 let test_cases = TestCases::default().with_verify_encoding(Arc::new(|cols| {
438 assert_eq!(cols.len(), 1);
439 let col = &cols[0];
440 assert!(!matches!(
441 col.encoding.column_encoding.as_ref().unwrap(),
442 column_encoding::ColumnEncoding::Blob(_)
443 ));
444 }));
445 check_round_trip_encoding_of_data(vec![array], &test_cases, Default::default()).await;
447 }
448}