1use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc};
5
6use arrow::array::AsArray;
7use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type};
8use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder};
9use arrow_array::Array;
10use arrow_array::RecordBatch;
11use arrow_schema::DataType as ArrowDataType;
12use lance_arrow::{FieldExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
13use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
14use object_store::path::Path;
15use snafu::location;
16use tokio::io::AsyncWriteExt;
17use tokio::sync::Mutex;
18
19use super::take::TakeBuilder;
20use super::{Dataset, ProjectionRequest};
21use arrow_array::StructArray;
22use lance_core::datatypes::{BlobKind, BlobVersion};
23use lance_core::utils::blob::blob_path;
24use lance_core::{utils::address::RowAddress, Error, Result};
25use lance_io::traits::Reader;
26
27pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version";
28
29pub fn blob_version_from_config(config: &HashMap<String, String>) -> BlobVersion {
30 config
31 .get(BLOB_VERSION_CONFIG_KEY)
32 .and_then(|value| BlobVersion::from_config_value(value))
33 .unwrap_or(BlobVersion::V1)
34}
35
36const INLINE_MAX: usize = 64 * 1024; const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; struct PackWriter {
47 object_store: ObjectStore,
48 data_dir: Path,
49 data_file_key: String,
50 max_pack_size: usize,
51 current_blob_id: Option<u32>,
52 writer: Option<lance_io::object_writer::ObjectWriter>,
53 current_size: usize,
54}
55
56impl PackWriter {
57 fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self {
58 Self {
59 object_store,
60 data_dir,
61 data_file_key,
62 max_pack_size: PACK_FILE_MAX_SIZE,
63 current_blob_id: None,
64 writer: None,
65 current_size: 0,
66 }
67 }
68
69 async fn start_new_pack(&mut self, blob_id: u32) -> Result<()> {
70 let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
71 let writer = self.object_store.create(&path).await?;
72 self.writer = Some(writer);
73 self.current_blob_id = Some(blob_id);
74 self.current_size = 0;
75 Ok(())
76 }
77
78 async fn write_with_allocator<F>(
87 &mut self,
88 alloc_blob_id: &mut F,
89 data: &[u8],
90 ) -> Result<(u32, u64)>
91 where
92 F: FnMut() -> u32,
93 {
94 let len = data.len();
95 if self
96 .current_blob_id
97 .map(|_| self.current_size + len > self.max_pack_size)
98 .unwrap_or(true)
99 {
100 let blob_id = alloc_blob_id();
101 self.finish().await?;
102 self.start_new_pack(blob_id).await?;
103 }
104
105 let writer = self.writer.as_mut().expect("pack writer is initialized");
106 let position = self.current_size as u64;
107 writer.write_all(data).await?;
108 self.current_size += len;
109 Ok((self.current_blob_id.expect("pack blob id"), position))
110 }
111
112 async fn finish(&mut self) -> Result<()> {
113 if let Some(mut writer) = self.writer.take() {
114 writer.shutdown().await?;
115 }
116 self.current_blob_id = None;
117 self.current_size = 0;
118 Ok(())
119 }
120}
121
122pub struct BlobPreprocessor {
128 object_store: ObjectStore,
129 data_dir: Path,
130 data_file_key: String,
131 local_counter: u32,
132 pack_writer: PackWriter,
133}
134
135impl BlobPreprocessor {
136 pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self {
137 let pack_writer = PackWriter::new(
138 object_store.clone(),
139 data_dir.clone(),
140 data_file_key.clone(),
141 );
142 Self {
143 object_store,
144 data_dir,
145 data_file_key,
146 local_counter: 1,
148 pack_writer,
149 }
150 }
151
152 fn next_blob_id(&mut self) -> u32 {
153 let id = self.local_counter;
154 self.local_counter += 1;
155 id
156 }
157
158 async fn write_dedicated(&mut self, blob_id: u32, data: &[u8]) -> Result<Path> {
159 let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
160 let mut writer = self.object_store.create(&path).await?;
161 writer.write_all(data).await?;
162 writer.shutdown().await?;
163 Ok(path)
164 }
165
166 async fn write_packed(&mut self, data: &[u8]) -> Result<(u32, u64)> {
167 let (counter, pack_writer) = (&mut self.local_counter, &mut self.pack_writer);
168 pack_writer
169 .write_with_allocator(
170 &mut || {
171 let id = *counter;
172 *counter += 1;
173 id
174 },
175 data,
176 )
177 .await
178 }
179 pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
180 let mut new_columns = Vec::with_capacity(batch.num_columns());
181 let mut new_fields = Vec::with_capacity(batch.num_columns());
182
183 for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
184 if !field.is_blob_v2() {
185 new_columns.push(array.clone());
186 new_fields.push(field.clone());
187 continue;
188 }
189
190 let dedicated_threshold = field
191 .metadata()
192 .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY)
193 .and_then(|value| value.parse::<usize>().ok())
194 .filter(|&value| value > DEDICATED_THRESHOLD)
195 .unwrap_or(DEDICATED_THRESHOLD);
196
197 let struct_arr = array
198 .as_any()
199 .downcast_ref::<arrow_array::StructArray>()
200 .ok_or_else(|| {
201 Error::invalid_input("Blob column was not a struct array", location!())
202 })?;
203
204 let data_col = struct_arr
205 .column_by_name("data")
206 .ok_or_else(|| {
207 Error::invalid_input("Blob struct missing `data` field", location!())
208 })?
209 .as_binary::<i64>();
210 let uri_col = struct_arr
211 .column_by_name("uri")
212 .ok_or_else(|| {
213 Error::invalid_input("Blob struct missing `uri` field", location!())
214 })?
215 .as_string::<i32>();
216
217 let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0);
218 let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0);
219 let mut blob_id_builder =
220 PrimitiveBuilder::<arrow_array::types::UInt32Type>::with_capacity(struct_arr.len());
221 let mut blob_size_builder =
222 PrimitiveBuilder::<arrow_array::types::UInt64Type>::with_capacity(struct_arr.len());
223 let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(struct_arr.len());
224 let mut position_builder =
225 PrimitiveBuilder::<arrow_array::types::UInt64Type>::with_capacity(struct_arr.len());
226
227 let struct_nulls = struct_arr.nulls();
228
229 for i in 0..struct_arr.len() {
230 if struct_arr.is_null(i) {
231 data_builder.append_null();
232 uri_builder.append_null();
233 blob_id_builder.append_null();
234 blob_size_builder.append_null();
235 kind_builder.append_null();
236 position_builder.append_null();
237 continue;
238 }
239
240 let has_data = !data_col.is_null(i);
241 let has_uri = !uri_col.is_null(i);
242 let data_len = if has_data { data_col.value(i).len() } else { 0 };
243
244 if has_data && data_len > dedicated_threshold {
245 let blob_id = self.next_blob_id();
246 self.write_dedicated(blob_id, data_col.value(i)).await?;
247
248 kind_builder.append_value(BlobKind::Dedicated as u8);
249 data_builder.append_null();
250 uri_builder.append_null();
251 blob_id_builder.append_value(blob_id);
252 blob_size_builder.append_value(data_len as u64);
253 position_builder.append_null();
254 continue;
255 }
256
257 if has_data && data_len > INLINE_MAX {
258 let (pack_blob_id, position) = self.write_packed(data_col.value(i)).await?;
259
260 kind_builder.append_value(BlobKind::Packed as u8);
261 data_builder.append_null();
262 uri_builder.append_null();
263 blob_id_builder.append_value(pack_blob_id);
264 blob_size_builder.append_value(data_len as u64);
265 position_builder.append_value(position);
266 continue;
267 }
268
269 if has_uri {
270 let uri_val = uri_col.value(i);
271 kind_builder.append_value(BlobKind::External as u8);
272 data_builder.append_null();
273 uri_builder.append_value(uri_val);
274 blob_id_builder.append_null();
275 blob_size_builder.append_null();
276 position_builder.append_null();
277 continue;
278 }
279
280 if has_data {
281 kind_builder.append_value(BlobKind::Inline as u8);
282 let value = data_col.value(i);
283 data_builder.append_value(value);
284 uri_builder.append_null();
285 blob_id_builder.append_null();
286 blob_size_builder.append_null();
287 position_builder.append_null();
288 } else {
289 data_builder.append_null();
290 uri_builder.append_null();
291 blob_id_builder.append_null();
292 blob_size_builder.append_null();
293 kind_builder.append_null();
294 position_builder.append_null();
295 }
296 }
297
298 let child_fields = vec![
299 arrow_schema::Field::new("kind", ArrowDataType::UInt8, true),
300 arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true),
301 arrow_schema::Field::new("uri", ArrowDataType::Utf8, true),
302 arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true),
303 arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true),
304 arrow_schema::Field::new("position", ArrowDataType::UInt64, true),
305 ];
306
307 let struct_array = arrow_array::StructArray::try_new(
308 child_fields.clone().into(),
309 vec![
310 Arc::new(kind_builder.finish()),
311 Arc::new(data_builder.finish()),
312 Arc::new(uri_builder.finish()),
313 Arc::new(blob_id_builder.finish()),
314 Arc::new(blob_size_builder.finish()),
315 Arc::new(position_builder.finish()),
316 ],
317 struct_nulls.cloned(),
318 )?;
319
320 new_columns.push(Arc::new(struct_array));
321 new_fields.push(Arc::new(
322 arrow_schema::Field::new(
323 field.name(),
324 ArrowDataType::Struct(child_fields.into()),
325 field.is_nullable(),
326 )
327 .with_metadata(field.metadata().clone()),
328 ));
329 }
330
331 let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata(
332 new_fields
333 .iter()
334 .map(|f| f.as_ref().clone())
335 .collect::<Vec<_>>(),
336 batch.schema().metadata().clone(),
337 ));
338
339 RecordBatch::try_new(new_schema, new_columns)
340 .map_err(|e| Error::invalid_input(e.to_string(), location!()))
341 }
342
343 pub(crate) async fn finish(&mut self) -> Result<()> {
344 self.pack_writer.finish().await
345 }
346}
347
348pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool {
349 schema.fields.iter().any(|f| f.is_blob_v2())
350}
351
352pub async fn preprocess_blob_batches(
353 batches: &[RecordBatch],
354 pre: &mut BlobPreprocessor,
355) -> Result<Vec<RecordBatch>> {
356 let mut out = Vec::with_capacity(batches.len());
357 for batch in batches {
358 out.push(pre.preprocess_batch(batch).await?);
359 }
360 Ok(out)
361}
362
363#[derive(Debug)]
368enum ReaderState {
369 Uninitialized(u64),
370 Open((u64, Arc<dyn Reader>)),
371 Closed,
372}
373
374#[derive(Debug)]
376pub struct BlobFile {
377 object_store: Arc<ObjectStore>,
378 path: Path,
379 reader: Arc<Mutex<ReaderState>>,
380 position: u64,
381 size: u64,
382 kind: BlobKind,
383 uri: Option<String>,
384}
385
386impl BlobFile {
387 pub fn new_inline(
391 dataset: Arc<Dataset>,
392 field_id: u32,
393 row_addr: u64,
394 position: u64,
395 size: u64,
396 ) -> Self {
397 let frag_id = RowAddress::from(row_addr).fragment_id();
398 let frag = dataset.get_fragment(frag_id as usize).unwrap();
399 let data_file = frag.data_file_for_field(field_id).unwrap();
400 let data_file = dataset.data_dir().child(data_file.path.as_str());
401 Self {
402 object_store: dataset.object_store.clone(),
403 path: data_file,
404 position,
405 size,
406 kind: BlobKind::Inline,
407 uri: None,
408 reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
409 }
410 }
411
412 pub fn new_dedicated(dataset: Arc<Dataset>, path: Path, size: u64) -> Self {
413 Self {
414 object_store: dataset.object_store.clone(),
415 path,
416 position: 0,
417 size,
418 kind: BlobKind::Dedicated,
419 uri: None,
420 reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
421 }
422 }
423 pub fn new_packed(dataset: Arc<Dataset>, path: Path, position: u64, size: u64) -> Self {
424 Self {
425 object_store: dataset.object_store.clone(),
426 path,
427 position,
428 size,
429 kind: BlobKind::Packed,
430 uri: None,
431 reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
432 }
433 }
434 pub async fn new_external(
435 uri: String,
436 size: u64,
437 registry: Arc<ObjectStoreRegistry>,
438 params: Arc<ObjectStoreParams>,
439 ) -> Result<Self> {
440 let (object_store, path) =
441 ObjectStore::from_uri_and_params(registry, &uri, ¶ms).await?;
442 let size = if size > 0 {
443 size
444 } else {
445 object_store.size(&path).await?
446 };
447 Ok(Self {
448 object_store,
449 path,
450 position: 0,
451 size,
452 kind: BlobKind::External,
453 uri: Some(uri),
454 reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
455 })
456 }
457
458 pub async fn close(&self) -> Result<()> {
460 let mut reader = self.reader.lock().await;
461 *reader = ReaderState::Closed;
462 Ok(())
463 }
464
465 pub async fn is_closed(&self) -> bool {
467 matches!(*self.reader.lock().await, ReaderState::Closed)
468 }
469
470 async fn do_with_reader<
471 T,
472 Fut: Future<Output = Result<(u64, T)>>,
473 Func: FnOnce(u64, Arc<dyn Reader>) -> Fut,
474 >(
475 &self,
476 func: Func,
477 ) -> Result<T> {
478 let mut reader = self.reader.lock().await;
479 if let ReaderState::Uninitialized(cursor) = *reader {
480 let opened = self.object_store.open(&self.path).await?;
481 let opened = Arc::<dyn Reader>::from(opened);
482 *reader = ReaderState::Open((cursor, opened.clone()));
483 }
484 match reader.deref_mut() {
485 ReaderState::Open((cursor, reader)) => {
486 let (new_cursor, data) = func(*cursor, reader.clone()).await?;
487 *cursor = new_cursor;
488 Ok(data)
489 }
490 ReaderState::Closed => Err(Error::invalid_input(
491 "Blob file is already closed".to_string(),
492 location!(),
493 )),
494 _ => unreachable!(),
495 }
496 }
497
498 pub async fn read(&self) -> Result<bytes::Bytes> {
504 let position = self.position;
505 let size = self.size;
506 self.do_with_reader(|cursor, reader| async move {
507 let start = position as usize + cursor as usize;
508 let end = (position + size) as usize;
509 Ok((end as u64, reader.get_range(start..end).await?))
510 })
511 .await
512 }
513
514 pub async fn read_up_to(&self, len: usize) -> Result<bytes::Bytes> {
519 let position = self.position;
520 let size = self.size;
521 self.do_with_reader(|cursor, reader| async move {
522 let start = position as usize + cursor as usize;
523 let read_size = len.min((size - cursor) as usize);
524 let end = start + read_size;
525 let data = reader.get_range(start..end).await?;
526 Ok((end as u64 - position, data))
527 })
528 .await
529 }
530
531 pub async fn seek(&self, new_cursor: u64) -> Result<()> {
533 let mut reader = self.reader.lock().await;
534 match reader.deref_mut() {
535 ReaderState::Open((cursor, _)) => {
536 *cursor = new_cursor;
537 Ok(())
538 }
539 ReaderState::Closed => Err(Error::invalid_input(
540 "Blob file is already closed".to_string(),
541 location!(),
542 )),
543 ReaderState::Uninitialized(cursor) => {
544 *cursor = new_cursor;
545 Ok(())
546 }
547 }
548 }
549
550 pub async fn tell(&self) -> Result<u64> {
552 let reader = self.reader.lock().await;
553 match *reader {
554 ReaderState::Open((cursor, _)) => Ok(cursor),
555 ReaderState::Closed => Err(Error::invalid_input(
556 "Blob file is already closed".to_string(),
557 location!(),
558 )),
559 ReaderState::Uninitialized(cursor) => Ok(cursor),
560 }
561 }
562
563 pub fn size(&self) -> u64 {
565 self.size
566 }
567
568 pub fn position(&self) -> u64 {
569 self.position
570 }
571
572 pub fn data_path(&self) -> &Path {
573 &self.path
574 }
575
576 pub fn kind(&self) -> BlobKind {
577 self.kind
578 }
579
580 pub fn uri(&self) -> Option<&str> {
581 self.uri.as_deref()
582 }
583}
584
585pub(super) async fn take_blobs(
586 dataset: &Arc<Dataset>,
587 row_ids: &[u64],
588 column: &str,
589) -> Result<Vec<BlobFile>> {
590 let projection = dataset.schema().project(&[column])?;
591 let blob_field = &projection.fields[0];
592 let blob_field_id = blob_field.id;
593 if !projection.fields[0].is_blob() {
594 return Err(Error::InvalidInput {
595 location: location!(),
596 source: format!("the column '{}' is not a blob column", column).into(),
597 });
598 }
599 let description_and_addr = dataset
600 .take_builder(row_ids, projection)?
601 .with_row_address(true)
602 .execute()
603 .await?;
604 let descriptions = description_and_addr.column(0).as_struct();
605 let row_addrs = description_and_addr.column(1).as_primitive::<UInt64Type>();
606 let blob_field_id = blob_field_id as u32;
607
608 match dataset.blob_version() {
609 BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs),
610 BlobVersion::V2 => {
611 collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await
612 }
613 }
614}
615
616pub async fn take_blobs_by_addresses(
624 dataset: &Arc<Dataset>,
625 row_addrs: &[u64],
626 column: &str,
627) -> Result<Vec<BlobFile>> {
628 let projection = dataset.schema().project(&[column])?;
629 let blob_field = &projection.fields[0];
630 let blob_field_id = blob_field.id;
631 if !projection.fields[0].is_blob() {
632 return Err(Error::InvalidInput {
633 location: location!(),
634 source: format!("the column '{}' is not a blob column", column).into(),
635 });
636 }
637
638 let projection_request = ProjectionRequest::from(projection);
640 let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?);
641
642 let description_and_addr =
646 TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)?
647 .with_row_address(true)
648 .execute()
649 .await?;
650
651 let descriptions = description_and_addr.column(0).as_struct();
652 let row_addrs_result = description_and_addr.column(1).as_primitive::<UInt64Type>();
653 let blob_field_id = blob_field_id as u32;
654
655 match dataset.blob_version() {
656 BlobVersion::V1 => {
657 collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result)
658 }
659 BlobVersion::V2 => {
660 collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result).await
661 }
662 }
663}
664
665fn collect_blob_files_v1(
666 dataset: &Arc<Dataset>,
667 blob_field_id: u32,
668 descriptions: &StructArray,
669 row_addrs: &arrow::array::PrimitiveArray<UInt64Type>,
670) -> Result<Vec<BlobFile>> {
671 let positions = descriptions.column(0).as_primitive::<UInt64Type>();
672 let sizes = descriptions.column(1).as_primitive::<UInt64Type>();
673
674 Ok(row_addrs
675 .values()
676 .iter()
677 .zip(positions.iter())
678 .zip(sizes.iter())
679 .filter_map(|((row_addr, position), size)| {
680 let position = position?;
681 let size = size?;
682 Some((*row_addr, position, size))
683 })
684 .map(|(row_addr, position, size)| {
685 BlobFile::new_inline(dataset.clone(), blob_field_id, row_addr, position, size)
686 })
687 .collect())
688}
689
690async fn collect_blob_files_v2(
691 dataset: &Arc<Dataset>,
692 blob_field_id: u32,
693 descriptions: &StructArray,
694 row_addrs: &arrow::array::PrimitiveArray<UInt64Type>,
695) -> Result<Vec<BlobFile>> {
696 let kinds = descriptions.column(0).as_primitive::<UInt8Type>();
697 let positions = descriptions.column(1).as_primitive::<UInt64Type>();
698 let sizes = descriptions.column(2).as_primitive::<UInt64Type>();
699 let blob_ids = descriptions.column(3).as_primitive::<UInt32Type>();
700 let blob_uris = descriptions.column(4).as_string::<i32>();
701
702 let mut files = Vec::with_capacity(row_addrs.len());
703 for (idx, row_addr) in row_addrs.values().iter().enumerate() {
704 let kind = BlobKind::try_from(kinds.value(idx))?;
705
706 if matches!(kind, BlobKind::Inline) && positions.value(idx) == 0 && sizes.value(idx) == 0 {
708 continue;
709 }
710
711 match kind {
712 BlobKind::Inline => {
713 let position = positions.value(idx);
714 let size = sizes.value(idx);
715 files.push(BlobFile::new_inline(
716 dataset.clone(),
717 blob_field_id,
718 *row_addr,
719 position,
720 size,
721 ));
722 }
723 BlobKind::Dedicated => {
724 let blob_id = blob_ids.value(idx);
725 let size = sizes.value(idx);
726 let frag_id = RowAddress::from(*row_addr).fragment_id();
727 let frag =
728 dataset
729 .get_fragment(frag_id as usize)
730 .ok_or_else(|| Error::Internal {
731 message: "Fragment not found".to_string(),
732 location: location!(),
733 })?;
734 let data_file =
735 frag.data_file_for_field(blob_field_id)
736 .ok_or_else(|| Error::Internal {
737 message: "Data file not found for blob field".to_string(),
738 location: location!(),
739 })?;
740
741 let data_file_key = data_file_key_from_path(data_file.path.as_str());
742 let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
743 files.push(BlobFile::new_dedicated(dataset.clone(), path, size));
744 }
745 BlobKind::Packed => {
746 let blob_id = blob_ids.value(idx);
747 let size = sizes.value(idx);
748 let position = positions.value(idx);
749 let frag_id = RowAddress::from(*row_addr).fragment_id();
750 let frag =
751 dataset
752 .get_fragment(frag_id as usize)
753 .ok_or_else(|| Error::Internal {
754 message: "Fragment not found".to_string(),
755 location: location!(),
756 })?;
757 let data_file =
758 frag.data_file_for_field(blob_field_id)
759 .ok_or_else(|| Error::Internal {
760 message: "Data file not found for blob field".to_string(),
761 location: location!(),
762 })?;
763 let data_file_key = data_file_key_from_path(data_file.path.as_str());
764 let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
765 files.push(BlobFile::new_packed(dataset.clone(), path, position, size));
766 }
767 BlobKind::External => {
768 let uri = blob_uris.value(idx).to_string();
769 let size = sizes.value(idx);
770 let registry = dataset.session.store_registry();
771 let params = dataset
772 .store_params
773 .as_ref()
774 .map(|p| Arc::new((**p).clone()))
775 .unwrap_or_else(|| Arc::new(ObjectStoreParams::default()));
776 files.push(BlobFile::new_external(uri, size, registry, params).await?);
777 }
778 }
779 }
780
781 Ok(files)
782}
783
784fn data_file_key_from_path(path: &str) -> &str {
785 let filename = path.rsplit('/').next().unwrap_or(path);
786 filename.strip_suffix(".lance").unwrap_or(filename)
787}
788
789#[cfg(test)]
790mod tests {
791 use std::sync::Arc;
792
793 use arrow::{array::AsArray, datatypes::UInt64Type};
794 use arrow_array::RecordBatch;
795 use arrow_array::{RecordBatchIterator, UInt32Array};
796 use arrow_schema::{DataType, Field, Schema};
797 use futures::TryStreamExt;
798 use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
799 use lance_io::stream::RecordBatchStream;
800
801 use lance_core::datatypes::BlobKind;
802 use lance_core::{utils::tempfile::TempStrDir, Error, Result};
803 use lance_datagen::{array, BatchCount, RowCount};
804 use lance_file::version::LanceFileVersion;
805
806 use super::data_file_key_from_path;
807 use crate::{
808 blob::{blob_field, BlobArrayBuilder},
809 dataset::WriteParams,
810 utils::test::TestDatasetGenerator,
811 Dataset,
812 };
813
814 struct BlobTestFixture {
815 _test_dir: TempStrDir,
816 dataset: Arc<Dataset>,
817 data: Vec<RecordBatch>,
818 }
819
820 impl BlobTestFixture {
821 async fn new() -> Self {
822 let test_dir = TempStrDir::default();
823
824 let data = lance_datagen::gen_batch()
825 .col("filterme", array::step::<UInt64Type>())
826 .col("blobs", array::blob())
827 .into_reader_rows(RowCount::from(10), BatchCount::from(10))
828 .map(|batch| Ok(batch?))
829 .collect::<Result<Vec<_>>>()
830 .unwrap();
831
832 let dataset = Arc::new(
833 TestDatasetGenerator::new(data.clone(), LanceFileVersion::default())
834 .make_hostile(&test_dir)
835 .await,
836 );
837
838 Self {
839 _test_dir: test_dir,
840 dataset,
841 data,
842 }
843 }
844 }
845
846 #[tokio::test]
847 pub async fn test_take_blobs() {
848 let fixture = BlobTestFixture::new().await;
849
850 let row_ids = fixture
851 .dataset
852 .scan()
853 .project::<String>(&[])
854 .unwrap()
855 .filter("filterme >= 50")
856 .unwrap()
857 .with_row_id()
858 .try_into_batch()
859 .await
860 .unwrap();
861 let row_ids = row_ids.column(0).as_primitive::<UInt64Type>().values();
862 let row_ids = vec![row_ids[5], row_ids[17], row_ids[33]];
863
864 let blobs = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap();
865
866 for (actual_idx, (expected_batch_idx, expected_row_idx)) in
867 [(5, 5), (6, 7), (8, 3)].iter().enumerate()
868 {
869 let val = blobs[actual_idx].read().await.unwrap();
870 let expected = fixture.data[*expected_batch_idx]
871 .column(1)
872 .as_binary::<i64>()
873 .value(*expected_row_idx);
874
875 assert_eq!(&val, expected);
876 }
877 }
878
879 #[tokio::test]
880 pub async fn test_take_blobs_by_indices() {
881 let fixture = BlobTestFixture::new().await;
882
883 let fragments = fixture.dataset.fragments();
884 assert!(fragments.len() >= 2);
885 let mut indices = Vec::with_capacity(fragments.len());
886 let mut last = 2;
887
888 for frag in fragments.iter() {
889 indices.push(last as u64);
890 last += frag.num_rows().unwrap_or(0);
891 }
892 indices.pop();
893
894 assert_eq!(indices, [2, 12, 22, 32, 42, 52, 62, 72, 82]);
896 let blobs = fixture
897 .dataset
898 .take_blobs_by_indices(&indices, "blobs")
899 .await
900 .unwrap();
901
902 let row_ids = fragments
904 .iter()
905 .map(|frag| (frag.id << 32) + 2)
906 .collect::<Vec<_>>();
907 let blobs2 = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap();
908
909 for (blob1, blob2) in blobs.iter().zip(blobs2.iter()) {
910 assert_eq!(blob1.position(), blob2.position());
911 assert_eq!(blob1.size(), blob2.size());
912 assert_eq!(blob1.data_path(), blob2.data_path());
913 }
914 }
915
916 #[tokio::test]
917 pub async fn test_take_blob_id_not_exist() {
918 let fixture = BlobTestFixture::new().await;
919
920 let err = fixture.dataset.take_blobs(&[1000], "blobs").await;
921
922 assert!(matches!(err, Err(Error::InvalidInput { .. })));
923 }
924
925 #[tokio::test]
926 pub async fn test_take_blob_not_blob_col() {
927 let fixture = BlobTestFixture::new().await;
928
929 let err = fixture.dataset.take_blobs(&[0], "filterme").await;
930
931 assert!(matches!(err, Err(Error::InvalidInput { .. })));
932 assert!(err.unwrap_err().to_string().contains("not a blob column"));
933 }
934
935 #[tokio::test]
936 pub async fn test_scan_blobs() {
937 let fixture = BlobTestFixture::new().await;
938
939 let batches = fixture
941 .dataset
942 .scan()
943 .project(&["blobs"])
944 .unwrap()
945 .try_into_stream()
946 .await
947 .unwrap();
948
949 let schema = batches.schema();
950
951 assert!(schema.fields[0].data_type().is_struct());
952
953 let batches = batches.try_collect::<Vec<_>>().await.unwrap();
954
955 assert_eq!(batches.len(), 10);
956 for batch in batches.iter() {
957 assert_eq!(batch.num_columns(), 1);
958 assert!(batch.column(0).data_type().is_struct());
959 }
960
961 let batches = fixture
963 .dataset
964 .scan()
965 .project(&["blobs"])
966 .unwrap()
967 .filter("filterme = 50")
968 .unwrap()
969 .try_into_stream()
970 .await
971 .unwrap();
972
973 let schema = batches.schema();
974
975 assert!(schema.fields[0].data_type().is_struct());
976
977 let batches = batches.try_collect::<Vec<_>>().await.unwrap();
978
979 assert_eq!(batches.len(), 1);
980 for batch in batches.iter() {
981 assert_eq!(batch.num_columns(), 1);
982 assert!(batch.column(0).data_type().is_struct());
983 }
984 }
985
986 #[tokio::test]
995 pub async fn test_take_blobs_by_indices_with_stable_row_ids() {
996 use crate::dataset::WriteParams;
997 use arrow_array::RecordBatchIterator;
998
999 let test_dir = TempStrDir::default();
1000
1001 let data = lance_datagen::gen_batch()
1003 .col("filterme", array::step::<UInt64Type>())
1004 .col("blobs", array::blob())
1005 .into_reader_rows(RowCount::from(6), BatchCount::from(1))
1006 .map(|batch| Ok(batch.unwrap()))
1007 .collect::<Result<Vec<_>>>()
1008 .unwrap();
1009
1010 let write_params = WriteParams {
1012 enable_stable_row_ids: true,
1013 max_rows_per_file: 3, ..Default::default()
1015 };
1016
1017 let reader = RecordBatchIterator::new(data.clone().into_iter().map(Ok), data[0].schema());
1018 let dataset = Arc::new(
1019 Dataset::write(reader, &test_dir, Some(write_params))
1020 .await
1021 .unwrap(),
1022 );
1023
1024 let fragments = dataset.fragments();
1026 assert!(
1027 fragments.len() >= 2,
1028 "Expected at least 2 fragments, got {}",
1029 fragments.len()
1030 );
1031
1032 let blobs = dataset
1034 .take_blobs_by_indices(&[0, 1, 2], "blobs")
1035 .await
1036 .unwrap();
1037 assert_eq!(blobs.len(), 3, "First fragment blobs should have 3 items");
1038
1039 for blob in &blobs {
1041 let content = blob.read().await.unwrap();
1042 assert!(!content.is_empty(), "Blob content should not be empty");
1043 }
1044
1045 let blobs = dataset
1047 .take_blobs_by_indices(&[3, 4, 5], "blobs")
1048 .await
1049 .unwrap();
1050 assert_eq!(blobs.len(), 3, "Second fragment blobs should have 3 items");
1051
1052 for blob in &blobs {
1054 let content = blob.read().await.unwrap();
1055 assert!(!content.is_empty(), "Blob content should not be empty");
1056 }
1057
1058 let blobs = dataset
1060 .take_blobs_by_indices(&[1, 4], "blobs")
1061 .await
1062 .unwrap();
1063 assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items");
1064 }
1065
1066 #[test]
1067 fn test_data_file_key_from_path() {
1068 assert_eq!(data_file_key_from_path("data/abc.lance"), "abc");
1069 assert_eq!(data_file_key_from_path("abc.lance"), "abc");
1070 assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz");
1071 }
1072
1073 #[tokio::test]
1074 async fn test_write_and_take_blobs_with_blob_array_builder() {
1075 let test_dir = TempStrDir::default();
1076
1077 let mut blob_builder = BlobArrayBuilder::new(2);
1079 blob_builder.push_bytes(b"hello").unwrap();
1080 blob_builder.push_bytes(b"world").unwrap();
1081 let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap();
1082
1083 let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1]));
1084 let schema = Arc::new(Schema::new(vec![
1085 Field::new("id", DataType::UInt32, false),
1086 blob_field("blob", true),
1087 ]));
1088
1089 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
1090 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
1091
1092 let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
1093 let dataset = Arc::new(
1094 Dataset::write(reader, &test_dir, Some(params))
1095 .await
1096 .unwrap(),
1097 );
1098
1099 let blobs = dataset
1100 .take_blobs_by_indices(&[0, 1], "blob")
1101 .await
1102 .unwrap();
1103
1104 assert_eq!(blobs.len(), 2);
1105 let first = blobs[0].read().await.unwrap();
1106 let second = blobs[1].read().await.unwrap();
1107 assert_eq!(first.as_ref(), b"hello");
1108 assert_eq!(second.as_ref(), b"world");
1109 }
1110
1111 fn build_schema_with_meta(threshold_opt: Option<usize>) -> Arc<Schema> {
1112 let mut blob_field_with_meta = blob_field("blob", true);
1113 if let Some(threshold) = threshold_opt {
1114 let mut metadata = blob_field_with_meta.metadata().clone();
1115 metadata.insert(
1116 BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(),
1117 threshold.to_string(),
1118 );
1119 blob_field_with_meta = blob_field_with_meta.with_metadata(metadata);
1120 }
1121
1122 Arc::new(Schema::new(vec![
1123 Field::new("id", DataType::UInt32, false),
1124 blob_field_with_meta,
1125 ]))
1126 }
1127
1128 async fn write_then_get_blob_kinds(
1129 blob_sizes: Vec<usize>,
1130 threshold_opt: Option<usize>,
1131 ) -> Vec<BlobKind> {
1132 let test_dir = TempStrDir::default();
1133
1134 let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len());
1135 for size in &blob_sizes {
1136 blob_builder.push_bytes(vec![0u8; *size]).unwrap();
1137 }
1138 let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap();
1139
1140 let id_values: Vec<u32> = (0..blob_sizes.len() as u32).collect();
1141 let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values));
1142
1143 let schema = build_schema_with_meta(threshold_opt);
1144
1145 let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
1146 let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
1147
1148 let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
1149 let dataset = Arc::new(
1150 Dataset::write(reader, &test_dir, Some(params))
1151 .await
1152 .unwrap(),
1153 );
1154
1155 let indices: Vec<u64> = (0..blob_sizes.len() as u64).collect();
1156 let blobs = dataset
1157 .take_blobs_by_indices(&indices, "blob")
1158 .await
1159 .unwrap();
1160
1161 assert_eq!(blobs.len(), blob_sizes.len());
1162
1163 blobs.into_iter().map(|b| b.kind()).collect()
1164 }
1165
1166 #[tokio::test]
1167 async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() {
1168 let small_blob_len = super::DEDICATED_THRESHOLD / 2;
1169 let large_blob_len = super::DEDICATED_THRESHOLD + 1;
1170
1171 assert!(small_blob_len > super::INLINE_MAX);
1173
1174 let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")];
1175
1176 for (threshold_opt, label) in cases {
1177 let kinds =
1178 write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt)
1179 .await;
1180
1181 assert_eq!(kinds.len(), 2, "case: {label}");
1182 assert_eq!(kinds[0], BlobKind::Packed, "case: {label}");
1183 assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}");
1184 }
1185 }
1186
1187 #[tokio::test]
1188 async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() {
1189 let blob_len = super::DEDICATED_THRESHOLD / 2;
1190 let overridden_threshold = super::DEDICATED_THRESHOLD / 4;
1191
1192 assert!(blob_len > super::INLINE_MAX);
1193 assert!(overridden_threshold > 0);
1194 assert!(blob_len > overridden_threshold);
1195
1196 let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;
1197
1198 assert_eq!(kinds.len(), 1);
1199 assert_eq!(kinds[0], BlobKind::Packed);
1200 }
1201
1202 #[tokio::test]
1203 async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() {
1204 let blob_len = super::DEDICATED_THRESHOLD + 1;
1205 let overridden_threshold = super::DEDICATED_THRESHOLD * 2;
1206
1207 assert!(blob_len > super::INLINE_MAX);
1208 assert!(blob_len < overridden_threshold);
1209
1210 let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;
1211
1212 assert_eq!(kinds.len(), 1);
1213 assert_eq!(kinds[0], BlobKind::Packed);
1214 }
1215}