Skip to main content

lance/dataset/
blob.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, future::Future, ops::DerefMut, sync::Arc};
5
6use arrow::array::AsArray;
7use arrow::datatypes::{UInt32Type, UInt64Type, UInt8Type};
8use arrow_array::builder::{LargeBinaryBuilder, PrimitiveBuilder, StringBuilder};
9use arrow_array::Array;
10use arrow_array::RecordBatch;
11use arrow_schema::DataType as ArrowDataType;
12use lance_arrow::{FieldExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
13use lance_io::object_store::{ObjectStore, ObjectStoreParams, ObjectStoreRegistry};
14use object_store::path::Path;
15use snafu::location;
16use tokio::io::AsyncWriteExt;
17use tokio::sync::Mutex;
18
19use super::take::TakeBuilder;
20use super::{Dataset, ProjectionRequest};
21use arrow_array::StructArray;
22use lance_core::datatypes::{BlobKind, BlobVersion};
23use lance_core::utils::blob::blob_path;
24use lance_core::{utils::address::RowAddress, Error, Result};
25use lance_io::traits::Reader;
26
27pub const BLOB_VERSION_CONFIG_KEY: &str = "lance.blob.version";
28
29pub fn blob_version_from_config(config: &HashMap<String, String>) -> BlobVersion {
30    config
31        .get(BLOB_VERSION_CONFIG_KEY)
32        .and_then(|value| BlobVersion::from_config_value(value))
33        .unwrap_or(BlobVersion::V1)
34}
35
36const INLINE_MAX: usize = 64 * 1024; // 64KB inline cutoff
37const DEDICATED_THRESHOLD: usize = 4 * 1024 * 1024; // 4MB dedicated cutoff
38const PACK_FILE_MAX_SIZE: usize = 1024 * 1024 * 1024; // 1GiB per .pack sidecar
39
40// Maintains rolling `.blob` sidecar files for packed blobs.
41// Layout: data/{data_file_key}/{blob_id:08x}.blob where each file is an
42// unframed concatenation of blob payloads; descriptors store (blob_id,
43// position, size) to locate each slice. A dedicated struct keeps path state
44// and rolling size separate from the per-batch preprocessor logic, so we can
45// reuse the same writer across rows and close/roll files cleanly on finish.
46struct PackWriter {
47    object_store: ObjectStore,
48    data_dir: Path,
49    data_file_key: String,
50    max_pack_size: usize,
51    current_blob_id: Option<u32>,
52    writer: Option<lance_io::object_writer::ObjectWriter>,
53    current_size: usize,
54}
55
56impl PackWriter {
57    fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self {
58        Self {
59            object_store,
60            data_dir,
61            data_file_key,
62            max_pack_size: PACK_FILE_MAX_SIZE,
63            current_blob_id: None,
64            writer: None,
65            current_size: 0,
66        }
67    }
68
69    async fn start_new_pack(&mut self, blob_id: u32) -> Result<()> {
70        let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
71        let writer = self.object_store.create(&path).await?;
72        self.writer = Some(writer);
73        self.current_blob_id = Some(blob_id);
74        self.current_size = 0;
75        Ok(())
76    }
77
78    /// Append `data` to the current `.blob` file, rolling to a new file when
79    /// `max_pack_size` would be exceeded.
80    ///
81    /// alloc_blob_id: called only when a new pack file is opened; returns the
82    /// blob_id used as the file name.
83    ///
84    /// Returns `(blob_id, position)` where
85    /// position is the start offset of this payload in that pack file.
86    async fn write_with_allocator<F>(
87        &mut self,
88        alloc_blob_id: &mut F,
89        data: &[u8],
90    ) -> Result<(u32, u64)>
91    where
92        F: FnMut() -> u32,
93    {
94        let len = data.len();
95        if self
96            .current_blob_id
97            .map(|_| self.current_size + len > self.max_pack_size)
98            .unwrap_or(true)
99        {
100            let blob_id = alloc_blob_id();
101            self.finish().await?;
102            self.start_new_pack(blob_id).await?;
103        }
104
105        let writer = self.writer.as_mut().expect("pack writer is initialized");
106        let position = self.current_size as u64;
107        writer.write_all(data).await?;
108        self.current_size += len;
109        Ok((self.current_blob_id.expect("pack blob id"), position))
110    }
111
112    async fn finish(&mut self) -> Result<()> {
113        if let Some(mut writer) = self.writer.take() {
114            writer.shutdown().await?;
115        }
116        self.current_blob_id = None;
117        self.current_size = 0;
118        Ok(())
119    }
120}
121
122/// Preprocesses blob v2 columns on the write path so the encoder only sees lightweight descriptors:
123///
124/// - Spills large blobs to sidecar files before encoding, reducing memory/CPU and avoiding copying huge payloads through page builders.
125/// - Emits `blob_id/blob_size` tied to the data file stem, giving readers a stable path independent of temporary fragment IDs assigned during write.
126/// - Leaves small inline blobs and URI rows unchanged for compatibility.
127pub struct BlobPreprocessor {
128    object_store: ObjectStore,
129    data_dir: Path,
130    data_file_key: String,
131    local_counter: u32,
132    pack_writer: PackWriter,
133}
134
135impl BlobPreprocessor {
136    pub(crate) fn new(object_store: ObjectStore, data_dir: Path, data_file_key: String) -> Self {
137        let pack_writer = PackWriter::new(
138            object_store.clone(),
139            data_dir.clone(),
140            data_file_key.clone(),
141        );
142        Self {
143            object_store,
144            data_dir,
145            data_file_key,
146            // Start at 1 to avoid a potential all-zero blob_id value.
147            local_counter: 1,
148            pack_writer,
149        }
150    }
151
152    fn next_blob_id(&mut self) -> u32 {
153        let id = self.local_counter;
154        self.local_counter += 1;
155        id
156    }
157
158    async fn write_dedicated(&mut self, blob_id: u32, data: &[u8]) -> Result<Path> {
159        let path = blob_path(&self.data_dir, &self.data_file_key, blob_id);
160        let mut writer = self.object_store.create(&path).await?;
161        writer.write_all(data).await?;
162        writer.shutdown().await?;
163        Ok(path)
164    }
165
166    async fn write_packed(&mut self, data: &[u8]) -> Result<(u32, u64)> {
167        let (counter, pack_writer) = (&mut self.local_counter, &mut self.pack_writer);
168        pack_writer
169            .write_with_allocator(
170                &mut || {
171                    let id = *counter;
172                    *counter += 1;
173                    id
174                },
175                data,
176            )
177            .await
178    }
179    pub(crate) async fn preprocess_batch(&mut self, batch: &RecordBatch) -> Result<RecordBatch> {
180        let mut new_columns = Vec::with_capacity(batch.num_columns());
181        let mut new_fields = Vec::with_capacity(batch.num_columns());
182
183        for (array, field) in batch.columns().iter().zip(batch.schema().fields()) {
184            if !field.is_blob_v2() {
185                new_columns.push(array.clone());
186                new_fields.push(field.clone());
187                continue;
188            }
189
190            let dedicated_threshold = field
191                .metadata()
192                .get(BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY)
193                .and_then(|value| value.parse::<usize>().ok())
194                .filter(|&value| value > DEDICATED_THRESHOLD)
195                .unwrap_or(DEDICATED_THRESHOLD);
196
197            let struct_arr = array
198                .as_any()
199                .downcast_ref::<arrow_array::StructArray>()
200                .ok_or_else(|| {
201                    Error::invalid_input("Blob column was not a struct array", location!())
202                })?;
203
204            let data_col = struct_arr
205                .column_by_name("data")
206                .ok_or_else(|| {
207                    Error::invalid_input("Blob struct missing `data` field", location!())
208                })?
209                .as_binary::<i64>();
210            let uri_col = struct_arr
211                .column_by_name("uri")
212                .ok_or_else(|| {
213                    Error::invalid_input("Blob struct missing `uri` field", location!())
214                })?
215                .as_string::<i32>();
216
217            let mut data_builder = LargeBinaryBuilder::with_capacity(struct_arr.len(), 0);
218            let mut uri_builder = StringBuilder::with_capacity(struct_arr.len(), 0);
219            let mut blob_id_builder =
220                PrimitiveBuilder::<arrow_array::types::UInt32Type>::with_capacity(struct_arr.len());
221            let mut blob_size_builder =
222                PrimitiveBuilder::<arrow_array::types::UInt64Type>::with_capacity(struct_arr.len());
223            let mut kind_builder = PrimitiveBuilder::<UInt8Type>::with_capacity(struct_arr.len());
224            let mut position_builder =
225                PrimitiveBuilder::<arrow_array::types::UInt64Type>::with_capacity(struct_arr.len());
226
227            let struct_nulls = struct_arr.nulls();
228
229            for i in 0..struct_arr.len() {
230                if struct_arr.is_null(i) {
231                    data_builder.append_null();
232                    uri_builder.append_null();
233                    blob_id_builder.append_null();
234                    blob_size_builder.append_null();
235                    kind_builder.append_null();
236                    position_builder.append_null();
237                    continue;
238                }
239
240                let has_data = !data_col.is_null(i);
241                let has_uri = !uri_col.is_null(i);
242                let data_len = if has_data { data_col.value(i).len() } else { 0 };
243
244                if has_data && data_len > dedicated_threshold {
245                    let blob_id = self.next_blob_id();
246                    self.write_dedicated(blob_id, data_col.value(i)).await?;
247
248                    kind_builder.append_value(BlobKind::Dedicated as u8);
249                    data_builder.append_null();
250                    uri_builder.append_null();
251                    blob_id_builder.append_value(blob_id);
252                    blob_size_builder.append_value(data_len as u64);
253                    position_builder.append_null();
254                    continue;
255                }
256
257                if has_data && data_len > INLINE_MAX {
258                    let (pack_blob_id, position) = self.write_packed(data_col.value(i)).await?;
259
260                    kind_builder.append_value(BlobKind::Packed as u8);
261                    data_builder.append_null();
262                    uri_builder.append_null();
263                    blob_id_builder.append_value(pack_blob_id);
264                    blob_size_builder.append_value(data_len as u64);
265                    position_builder.append_value(position);
266                    continue;
267                }
268
269                if has_uri {
270                    let uri_val = uri_col.value(i);
271                    kind_builder.append_value(BlobKind::External as u8);
272                    data_builder.append_null();
273                    uri_builder.append_value(uri_val);
274                    blob_id_builder.append_null();
275                    blob_size_builder.append_null();
276                    position_builder.append_null();
277                    continue;
278                }
279
280                if has_data {
281                    kind_builder.append_value(BlobKind::Inline as u8);
282                    let value = data_col.value(i);
283                    data_builder.append_value(value);
284                    uri_builder.append_null();
285                    blob_id_builder.append_null();
286                    blob_size_builder.append_null();
287                    position_builder.append_null();
288                } else {
289                    data_builder.append_null();
290                    uri_builder.append_null();
291                    blob_id_builder.append_null();
292                    blob_size_builder.append_null();
293                    kind_builder.append_null();
294                    position_builder.append_null();
295                }
296            }
297
298            let child_fields = vec![
299                arrow_schema::Field::new("kind", ArrowDataType::UInt8, true),
300                arrow_schema::Field::new("data", ArrowDataType::LargeBinary, true),
301                arrow_schema::Field::new("uri", ArrowDataType::Utf8, true),
302                arrow_schema::Field::new("blob_id", ArrowDataType::UInt32, true),
303                arrow_schema::Field::new("blob_size", ArrowDataType::UInt64, true),
304                arrow_schema::Field::new("position", ArrowDataType::UInt64, true),
305            ];
306
307            let struct_array = arrow_array::StructArray::try_new(
308                child_fields.clone().into(),
309                vec![
310                    Arc::new(kind_builder.finish()),
311                    Arc::new(data_builder.finish()),
312                    Arc::new(uri_builder.finish()),
313                    Arc::new(blob_id_builder.finish()),
314                    Arc::new(blob_size_builder.finish()),
315                    Arc::new(position_builder.finish()),
316                ],
317                struct_nulls.cloned(),
318            )?;
319
320            new_columns.push(Arc::new(struct_array));
321            new_fields.push(Arc::new(
322                arrow_schema::Field::new(
323                    field.name(),
324                    ArrowDataType::Struct(child_fields.into()),
325                    field.is_nullable(),
326                )
327                .with_metadata(field.metadata().clone()),
328            ));
329        }
330
331        let new_schema = Arc::new(arrow_schema::Schema::new_with_metadata(
332            new_fields
333                .iter()
334                .map(|f| f.as_ref().clone())
335                .collect::<Vec<_>>(),
336            batch.schema().metadata().clone(),
337        ));
338
339        RecordBatch::try_new(new_schema, new_columns)
340            .map_err(|e| Error::invalid_input(e.to_string(), location!()))
341    }
342
343    pub(crate) async fn finish(&mut self) -> Result<()> {
344        self.pack_writer.finish().await
345    }
346}
347
348pub fn schema_has_blob_v2(schema: &lance_core::datatypes::Schema) -> bool {
349    schema.fields.iter().any(|f| f.is_blob_v2())
350}
351
352pub async fn preprocess_blob_batches(
353    batches: &[RecordBatch],
354    pre: &mut BlobPreprocessor,
355) -> Result<Vec<RecordBatch>> {
356    let mut out = Vec::with_capacity(batches.len());
357    for batch in batches {
358        out.push(pre.preprocess_batch(batch).await?);
359    }
360    Ok(out)
361}
362
363/// Current state of the reader.  Held in a mutex for easy sharing
364///
365/// The u64 is the cursor in the file that the reader is currently at
366/// (note that seeks are allowed before the file is opened)
367#[derive(Debug)]
368enum ReaderState {
369    Uninitialized(u64),
370    Open((u64, Arc<dyn Reader>)),
371    Closed,
372}
373
374/// A file-like object that represents a blob in a dataset
375#[derive(Debug)]
376pub struct BlobFile {
377    object_store: Arc<ObjectStore>,
378    path: Path,
379    reader: Arc<Mutex<ReaderState>>,
380    position: u64,
381    size: u64,
382    kind: BlobKind,
383    uri: Option<String>,
384}
385
386impl BlobFile {
387    /// Create a new BlobFile
388    ///
389    /// See [`crate::dataset::Dataset::take_blobs`]
390    pub fn new_inline(
391        dataset: Arc<Dataset>,
392        field_id: u32,
393        row_addr: u64,
394        position: u64,
395        size: u64,
396    ) -> Self {
397        let frag_id = RowAddress::from(row_addr).fragment_id();
398        let frag = dataset.get_fragment(frag_id as usize).unwrap();
399        let data_file = frag.data_file_for_field(field_id).unwrap();
400        let data_file = dataset.data_dir().child(data_file.path.as_str());
401        Self {
402            object_store: dataset.object_store.clone(),
403            path: data_file,
404            position,
405            size,
406            kind: BlobKind::Inline,
407            uri: None,
408            reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
409        }
410    }
411
412    pub fn new_dedicated(dataset: Arc<Dataset>, path: Path, size: u64) -> Self {
413        Self {
414            object_store: dataset.object_store.clone(),
415            path,
416            position: 0,
417            size,
418            kind: BlobKind::Dedicated,
419            uri: None,
420            reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
421        }
422    }
423    pub fn new_packed(dataset: Arc<Dataset>, path: Path, position: u64, size: u64) -> Self {
424        Self {
425            object_store: dataset.object_store.clone(),
426            path,
427            position,
428            size,
429            kind: BlobKind::Packed,
430            uri: None,
431            reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
432        }
433    }
434    pub async fn new_external(
435        uri: String,
436        size: u64,
437        registry: Arc<ObjectStoreRegistry>,
438        params: Arc<ObjectStoreParams>,
439    ) -> Result<Self> {
440        let (object_store, path) =
441            ObjectStore::from_uri_and_params(registry, &uri, &params).await?;
442        let size = if size > 0 {
443            size
444        } else {
445            object_store.size(&path).await?
446        };
447        Ok(Self {
448            object_store,
449            path,
450            position: 0,
451            size,
452            kind: BlobKind::External,
453            uri: Some(uri),
454            reader: Arc::new(Mutex::new(ReaderState::Uninitialized(0))),
455        })
456    }
457
458    /// Close the blob file, releasing any associated resources
459    pub async fn close(&self) -> Result<()> {
460        let mut reader = self.reader.lock().await;
461        *reader = ReaderState::Closed;
462        Ok(())
463    }
464
465    /// Returns true if the blob file is closed
466    pub async fn is_closed(&self) -> bool {
467        matches!(*self.reader.lock().await, ReaderState::Closed)
468    }
469
470    async fn do_with_reader<
471        T,
472        Fut: Future<Output = Result<(u64, T)>>,
473        Func: FnOnce(u64, Arc<dyn Reader>) -> Fut,
474    >(
475        &self,
476        func: Func,
477    ) -> Result<T> {
478        let mut reader = self.reader.lock().await;
479        if let ReaderState::Uninitialized(cursor) = *reader {
480            let opened = self.object_store.open(&self.path).await?;
481            let opened = Arc::<dyn Reader>::from(opened);
482            *reader = ReaderState::Open((cursor, opened.clone()));
483        }
484        match reader.deref_mut() {
485            ReaderState::Open((cursor, reader)) => {
486                let (new_cursor, data) = func(*cursor, reader.clone()).await?;
487                *cursor = new_cursor;
488                Ok(data)
489            }
490            ReaderState::Closed => Err(Error::invalid_input(
491                "Blob file is already closed".to_string(),
492                location!(),
493            )),
494            _ => unreachable!(),
495        }
496    }
497
498    /// Read the entire blob file from the current cursor position
499    /// to the end of the file
500    ///
501    /// After this call the cursor will be pointing to the end of
502    /// the file.
503    pub async fn read(&self) -> Result<bytes::Bytes> {
504        let position = self.position;
505        let size = self.size;
506        self.do_with_reader(|cursor, reader| async move {
507            let start = position as usize + cursor as usize;
508            let end = (position + size) as usize;
509            Ok((end as u64, reader.get_range(start..end).await?))
510        })
511        .await
512    }
513
514    /// Read up to `len` bytes from the current cursor position
515    ///
516    /// After this call the cursor will be pointing to the end of
517    /// the read data.
518    pub async fn read_up_to(&self, len: usize) -> Result<bytes::Bytes> {
519        let position = self.position;
520        let size = self.size;
521        self.do_with_reader(|cursor, reader| async move {
522            let start = position as usize + cursor as usize;
523            let read_size = len.min((size - cursor) as usize);
524            let end = start + read_size;
525            let data = reader.get_range(start..end).await?;
526            Ok((end as u64 - position, data))
527        })
528        .await
529    }
530
531    /// Seek to a new cursor position in the file
532    pub async fn seek(&self, new_cursor: u64) -> Result<()> {
533        let mut reader = self.reader.lock().await;
534        match reader.deref_mut() {
535            ReaderState::Open((cursor, _)) => {
536                *cursor = new_cursor;
537                Ok(())
538            }
539            ReaderState::Closed => Err(Error::invalid_input(
540                "Blob file is already closed".to_string(),
541                location!(),
542            )),
543            ReaderState::Uninitialized(cursor) => {
544                *cursor = new_cursor;
545                Ok(())
546            }
547        }
548    }
549
550    /// Return the current cursor position in the file
551    pub async fn tell(&self) -> Result<u64> {
552        let reader = self.reader.lock().await;
553        match *reader {
554            ReaderState::Open((cursor, _)) => Ok(cursor),
555            ReaderState::Closed => Err(Error::invalid_input(
556                "Blob file is already closed".to_string(),
557                location!(),
558            )),
559            ReaderState::Uninitialized(cursor) => Ok(cursor),
560        }
561    }
562
563    /// Return the size of the blob file in bytes
564    pub fn size(&self) -> u64 {
565        self.size
566    }
567
568    pub fn position(&self) -> u64 {
569        self.position
570    }
571
572    pub fn data_path(&self) -> &Path {
573        &self.path
574    }
575
576    pub fn kind(&self) -> BlobKind {
577        self.kind
578    }
579
580    pub fn uri(&self) -> Option<&str> {
581        self.uri.as_deref()
582    }
583}
584
585pub(super) async fn take_blobs(
586    dataset: &Arc<Dataset>,
587    row_ids: &[u64],
588    column: &str,
589) -> Result<Vec<BlobFile>> {
590    let projection = dataset.schema().project(&[column])?;
591    let blob_field = &projection.fields[0];
592    let blob_field_id = blob_field.id;
593    if !projection.fields[0].is_blob() {
594        return Err(Error::InvalidInput {
595            location: location!(),
596            source: format!("the column '{}' is not a blob column", column).into(),
597        });
598    }
599    let description_and_addr = dataset
600        .take_builder(row_ids, projection)?
601        .with_row_address(true)
602        .execute()
603        .await?;
604    let descriptions = description_and_addr.column(0).as_struct();
605    let row_addrs = description_and_addr.column(1).as_primitive::<UInt64Type>();
606    let blob_field_id = blob_field_id as u32;
607
608    match dataset.blob_version() {
609        BlobVersion::V1 => collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs),
610        BlobVersion::V2 => {
611            collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs).await
612        }
613    }
614}
615
616/// Take [BlobFile] by row addresses.
617///
618/// Row addresses are `u64` values encoding `(fragment_id << 32) | row_offset`.
619/// Use this method when you already have row addresses, for example from
620/// a scan with `with_row_address()`. For row IDs (stable identifiers), use
621/// [`Dataset::take_blobs`]. For row indices (offsets), use
622/// [`Dataset::take_blobs_by_indices`].
623pub async fn take_blobs_by_addresses(
624    dataset: &Arc<Dataset>,
625    row_addrs: &[u64],
626    column: &str,
627) -> Result<Vec<BlobFile>> {
628    let projection = dataset.schema().project(&[column])?;
629    let blob_field = &projection.fields[0];
630    let blob_field_id = blob_field.id;
631    if !projection.fields[0].is_blob() {
632        return Err(Error::InvalidInput {
633            location: location!(),
634            source: format!("the column '{}' is not a blob column", column).into(),
635        });
636    }
637
638    // Convert Schema to ProjectionPlan
639    let projection_request = ProjectionRequest::from(projection);
640    let projection_plan = Arc::new(projection_request.into_projection_plan(dataset.clone())?);
641
642    // Use try_new_from_addresses to bypass row ID index lookup.
643    // This is critical when enable_stable_row_ids=true because row addresses
644    // (fragment_id << 32 | row_offset) are different from row IDs (sequential integers).
645    let description_and_addr =
646        TakeBuilder::try_new_from_addresses(dataset.clone(), row_addrs.to_vec(), projection_plan)?
647            .with_row_address(true)
648            .execute()
649            .await?;
650
651    let descriptions = description_and_addr.column(0).as_struct();
652    let row_addrs_result = description_and_addr.column(1).as_primitive::<UInt64Type>();
653    let blob_field_id = blob_field_id as u32;
654
655    match dataset.blob_version() {
656        BlobVersion::V1 => {
657            collect_blob_files_v1(dataset, blob_field_id, descriptions, row_addrs_result)
658        }
659        BlobVersion::V2 => {
660            collect_blob_files_v2(dataset, blob_field_id, descriptions, row_addrs_result).await
661        }
662    }
663}
664
665fn collect_blob_files_v1(
666    dataset: &Arc<Dataset>,
667    blob_field_id: u32,
668    descriptions: &StructArray,
669    row_addrs: &arrow::array::PrimitiveArray<UInt64Type>,
670) -> Result<Vec<BlobFile>> {
671    let positions = descriptions.column(0).as_primitive::<UInt64Type>();
672    let sizes = descriptions.column(1).as_primitive::<UInt64Type>();
673
674    Ok(row_addrs
675        .values()
676        .iter()
677        .zip(positions.iter())
678        .zip(sizes.iter())
679        .filter_map(|((row_addr, position), size)| {
680            let position = position?;
681            let size = size?;
682            Some((*row_addr, position, size))
683        })
684        .map(|(row_addr, position, size)| {
685            BlobFile::new_inline(dataset.clone(), blob_field_id, row_addr, position, size)
686        })
687        .collect())
688}
689
690async fn collect_blob_files_v2(
691    dataset: &Arc<Dataset>,
692    blob_field_id: u32,
693    descriptions: &StructArray,
694    row_addrs: &arrow::array::PrimitiveArray<UInt64Type>,
695) -> Result<Vec<BlobFile>> {
696    let kinds = descriptions.column(0).as_primitive::<UInt8Type>();
697    let positions = descriptions.column(1).as_primitive::<UInt64Type>();
698    let sizes = descriptions.column(2).as_primitive::<UInt64Type>();
699    let blob_ids = descriptions.column(3).as_primitive::<UInt32Type>();
700    let blob_uris = descriptions.column(4).as_string::<i32>();
701
702    let mut files = Vec::with_capacity(row_addrs.len());
703    for (idx, row_addr) in row_addrs.values().iter().enumerate() {
704        let kind = BlobKind::try_from(kinds.value(idx))?;
705
706        // Struct is non-nullable; null rows are encoded as inline with zero position/size and empty uri
707        if matches!(kind, BlobKind::Inline) && positions.value(idx) == 0 && sizes.value(idx) == 0 {
708            continue;
709        }
710
711        match kind {
712            BlobKind::Inline => {
713                let position = positions.value(idx);
714                let size = sizes.value(idx);
715                files.push(BlobFile::new_inline(
716                    dataset.clone(),
717                    blob_field_id,
718                    *row_addr,
719                    position,
720                    size,
721                ));
722            }
723            BlobKind::Dedicated => {
724                let blob_id = blob_ids.value(idx);
725                let size = sizes.value(idx);
726                let frag_id = RowAddress::from(*row_addr).fragment_id();
727                let frag =
728                    dataset
729                        .get_fragment(frag_id as usize)
730                        .ok_or_else(|| Error::Internal {
731                            message: "Fragment not found".to_string(),
732                            location: location!(),
733                        })?;
734                let data_file =
735                    frag.data_file_for_field(blob_field_id)
736                        .ok_or_else(|| Error::Internal {
737                            message: "Data file not found for blob field".to_string(),
738                            location: location!(),
739                        })?;
740
741                let data_file_key = data_file_key_from_path(data_file.path.as_str());
742                let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
743                files.push(BlobFile::new_dedicated(dataset.clone(), path, size));
744            }
745            BlobKind::Packed => {
746                let blob_id = blob_ids.value(idx);
747                let size = sizes.value(idx);
748                let position = positions.value(idx);
749                let frag_id = RowAddress::from(*row_addr).fragment_id();
750                let frag =
751                    dataset
752                        .get_fragment(frag_id as usize)
753                        .ok_or_else(|| Error::Internal {
754                            message: "Fragment not found".to_string(),
755                            location: location!(),
756                        })?;
757                let data_file =
758                    frag.data_file_for_field(blob_field_id)
759                        .ok_or_else(|| Error::Internal {
760                            message: "Data file not found for blob field".to_string(),
761                            location: location!(),
762                        })?;
763                let data_file_key = data_file_key_from_path(data_file.path.as_str());
764                let path = blob_path(&dataset.data_dir(), data_file_key, blob_id);
765                files.push(BlobFile::new_packed(dataset.clone(), path, position, size));
766            }
767            BlobKind::External => {
768                let uri = blob_uris.value(idx).to_string();
769                let size = sizes.value(idx);
770                let registry = dataset.session.store_registry();
771                let params = dataset
772                    .store_params
773                    .as_ref()
774                    .map(|p| Arc::new((**p).clone()))
775                    .unwrap_or_else(|| Arc::new(ObjectStoreParams::default()));
776                files.push(BlobFile::new_external(uri, size, registry, params).await?);
777            }
778        }
779    }
780
781    Ok(files)
782}
783
784fn data_file_key_from_path(path: &str) -> &str {
785    let filename = path.rsplit('/').next().unwrap_or(path);
786    filename.strip_suffix(".lance").unwrap_or(filename)
787}
788
789#[cfg(test)]
790mod tests {
791    use std::sync::Arc;
792
793    use arrow::{array::AsArray, datatypes::UInt64Type};
794    use arrow_array::RecordBatch;
795    use arrow_array::{RecordBatchIterator, UInt32Array};
796    use arrow_schema::{DataType, Field, Schema};
797    use futures::TryStreamExt;
798    use lance_arrow::{DataTypeExt, BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY};
799    use lance_io::stream::RecordBatchStream;
800
801    use lance_core::datatypes::BlobKind;
802    use lance_core::{utils::tempfile::TempStrDir, Error, Result};
803    use lance_datagen::{array, BatchCount, RowCount};
804    use lance_file::version::LanceFileVersion;
805
806    use super::data_file_key_from_path;
807    use crate::{
808        blob::{blob_field, BlobArrayBuilder},
809        dataset::WriteParams,
810        utils::test::TestDatasetGenerator,
811        Dataset,
812    };
813
814    struct BlobTestFixture {
815        _test_dir: TempStrDir,
816        dataset: Arc<Dataset>,
817        data: Vec<RecordBatch>,
818    }
819
820    impl BlobTestFixture {
821        async fn new() -> Self {
822            let test_dir = TempStrDir::default();
823
824            let data = lance_datagen::gen_batch()
825                .col("filterme", array::step::<UInt64Type>())
826                .col("blobs", array::blob())
827                .into_reader_rows(RowCount::from(10), BatchCount::from(10))
828                .map(|batch| Ok(batch?))
829                .collect::<Result<Vec<_>>>()
830                .unwrap();
831
832            let dataset = Arc::new(
833                TestDatasetGenerator::new(data.clone(), LanceFileVersion::default())
834                    .make_hostile(&test_dir)
835                    .await,
836            );
837
838            Self {
839                _test_dir: test_dir,
840                dataset,
841                data,
842            }
843        }
844    }
845
846    #[tokio::test]
847    pub async fn test_take_blobs() {
848        let fixture = BlobTestFixture::new().await;
849
850        let row_ids = fixture
851            .dataset
852            .scan()
853            .project::<String>(&[])
854            .unwrap()
855            .filter("filterme >= 50")
856            .unwrap()
857            .with_row_id()
858            .try_into_batch()
859            .await
860            .unwrap();
861        let row_ids = row_ids.column(0).as_primitive::<UInt64Type>().values();
862        let row_ids = vec![row_ids[5], row_ids[17], row_ids[33]];
863
864        let blobs = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap();
865
866        for (actual_idx, (expected_batch_idx, expected_row_idx)) in
867            [(5, 5), (6, 7), (8, 3)].iter().enumerate()
868        {
869            let val = blobs[actual_idx].read().await.unwrap();
870            let expected = fixture.data[*expected_batch_idx]
871                .column(1)
872                .as_binary::<i64>()
873                .value(*expected_row_idx);
874
875            assert_eq!(&val, expected);
876        }
877    }
878
879    #[tokio::test]
880    pub async fn test_take_blobs_by_indices() {
881        let fixture = BlobTestFixture::new().await;
882
883        let fragments = fixture.dataset.fragments();
884        assert!(fragments.len() >= 2);
885        let mut indices = Vec::with_capacity(fragments.len());
886        let mut last = 2;
887
888        for frag in fragments.iter() {
889            indices.push(last as u64);
890            last += frag.num_rows().unwrap_or(0);
891        }
892        indices.pop();
893
894        // Row indices
895        assert_eq!(indices, [2, 12, 22, 32, 42, 52, 62, 72, 82]);
896        let blobs = fixture
897            .dataset
898            .take_blobs_by_indices(&indices, "blobs")
899            .await
900            .unwrap();
901
902        // Row IDs
903        let row_ids = fragments
904            .iter()
905            .map(|frag| (frag.id << 32) + 2)
906            .collect::<Vec<_>>();
907        let blobs2 = fixture.dataset.take_blobs(&row_ids, "blobs").await.unwrap();
908
909        for (blob1, blob2) in blobs.iter().zip(blobs2.iter()) {
910            assert_eq!(blob1.position(), blob2.position());
911            assert_eq!(blob1.size(), blob2.size());
912            assert_eq!(blob1.data_path(), blob2.data_path());
913        }
914    }
915
916    #[tokio::test]
917    pub async fn test_take_blob_id_not_exist() {
918        let fixture = BlobTestFixture::new().await;
919
920        let err = fixture.dataset.take_blobs(&[1000], "blobs").await;
921
922        assert!(matches!(err, Err(Error::InvalidInput { .. })));
923    }
924
925    #[tokio::test]
926    pub async fn test_take_blob_not_blob_col() {
927        let fixture = BlobTestFixture::new().await;
928
929        let err = fixture.dataset.take_blobs(&[0], "filterme").await;
930
931        assert!(matches!(err, Err(Error::InvalidInput { .. })));
932        assert!(err.unwrap_err().to_string().contains("not a blob column"));
933    }
934
935    #[tokio::test]
936    pub async fn test_scan_blobs() {
937        let fixture = BlobTestFixture::new().await;
938
939        // By default, scanning a blob column will load descriptions
940        let batches = fixture
941            .dataset
942            .scan()
943            .project(&["blobs"])
944            .unwrap()
945            .try_into_stream()
946            .await
947            .unwrap();
948
949        let schema = batches.schema();
950
951        assert!(schema.fields[0].data_type().is_struct());
952
953        let batches = batches.try_collect::<Vec<_>>().await.unwrap();
954
955        assert_eq!(batches.len(), 10);
956        for batch in batches.iter() {
957            assert_eq!(batch.num_columns(), 1);
958            assert!(batch.column(0).data_type().is_struct());
959        }
960
961        // Should also be able to scan with filter
962        let batches = fixture
963            .dataset
964            .scan()
965            .project(&["blobs"])
966            .unwrap()
967            .filter("filterme = 50")
968            .unwrap()
969            .try_into_stream()
970            .await
971            .unwrap();
972
973        let schema = batches.schema();
974
975        assert!(schema.fields[0].data_type().is_struct());
976
977        let batches = batches.try_collect::<Vec<_>>().await.unwrap();
978
979        assert_eq!(batches.len(), 1);
980        for batch in batches.iter() {
981            assert_eq!(batch.num_columns(), 1);
982            assert!(batch.column(0).data_type().is_struct());
983        }
984    }
985
986    /// Test that take_blobs_by_indices works correctly with enable_stable_row_ids=true.
987    ///
988    /// This is a regression test for a bug where take_blobs_by_indices would fail
989    /// with "index out of bounds" for fragment 1+ when stable row IDs are enabled.
990    /// The bug was caused by passing row addresses (from row_offsets_to_row_addresses)
991    /// to blob::take_blobs which expected row IDs. When stable row IDs are enabled,
992    /// row addresses (fragment_id << 32 | offset) are different from row IDs
993    /// (sequential integers), causing the row ID index lookup to fail for fragment 1+.
994    #[tokio::test]
995    pub async fn test_take_blobs_by_indices_with_stable_row_ids() {
996        use crate::dataset::WriteParams;
997        use arrow_array::RecordBatchIterator;
998
999        let test_dir = TempStrDir::default();
1000
1001        // Create test data with blob column
1002        let data = lance_datagen::gen_batch()
1003            .col("filterme", array::step::<UInt64Type>())
1004            .col("blobs", array::blob())
1005            .into_reader_rows(RowCount::from(6), BatchCount::from(1))
1006            .map(|batch| Ok(batch.unwrap()))
1007            .collect::<Result<Vec<_>>>()
1008            .unwrap();
1009
1010        // Write with enable_stable_row_ids=true and force multiple fragments
1011        let write_params = WriteParams {
1012            enable_stable_row_ids: true,
1013            max_rows_per_file: 3, // Force 2 fragments with 3 rows each
1014            ..Default::default()
1015        };
1016
1017        let reader = RecordBatchIterator::new(data.clone().into_iter().map(Ok), data[0].schema());
1018        let dataset = Arc::new(
1019            Dataset::write(reader, &test_dir, Some(write_params))
1020                .await
1021                .unwrap(),
1022        );
1023
1024        // Verify we have multiple fragments
1025        let fragments = dataset.fragments();
1026        assert!(
1027            fragments.len() >= 2,
1028            "Expected at least 2 fragments, got {}",
1029            fragments.len()
1030        );
1031
1032        // Test first fragment (indices 0, 1, 2) - this always worked
1033        let blobs = dataset
1034            .take_blobs_by_indices(&[0, 1, 2], "blobs")
1035            .await
1036            .unwrap();
1037        assert_eq!(blobs.len(), 3, "First fragment blobs should have 3 items");
1038
1039        // Verify we can read the blob content
1040        for blob in &blobs {
1041            let content = blob.read().await.unwrap();
1042            assert!(!content.is_empty(), "Blob content should not be empty");
1043        }
1044
1045        // Test second fragment (indices 3, 4, 5) - this was failing before the fix
1046        let blobs = dataset
1047            .take_blobs_by_indices(&[3, 4, 5], "blobs")
1048            .await
1049            .unwrap();
1050        assert_eq!(blobs.len(), 3, "Second fragment blobs should have 3 items");
1051
1052        // Verify we can read the blob content from second fragment
1053        for blob in &blobs {
1054            let content = blob.read().await.unwrap();
1055            assert!(!content.is_empty(), "Blob content should not be empty");
1056        }
1057
1058        // Test mixed indices from both fragments
1059        let blobs = dataset
1060            .take_blobs_by_indices(&[1, 4], "blobs")
1061            .await
1062            .unwrap();
1063        assert_eq!(blobs.len(), 2, "Mixed fragment blobs should have 2 items");
1064    }
1065
1066    #[test]
1067    fn test_data_file_key_from_path() {
1068        assert_eq!(data_file_key_from_path("data/abc.lance"), "abc");
1069        assert_eq!(data_file_key_from_path("abc.lance"), "abc");
1070        assert_eq!(data_file_key_from_path("nested/path/xyz"), "xyz");
1071    }
1072
1073    #[tokio::test]
1074    async fn test_write_and_take_blobs_with_blob_array_builder() {
1075        let test_dir = TempStrDir::default();
1076
1077        // Build a blob column with the new BlobArrayBuilder
1078        let mut blob_builder = BlobArrayBuilder::new(2);
1079        blob_builder.push_bytes(b"hello").unwrap();
1080        blob_builder.push_bytes(b"world").unwrap();
1081        let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap();
1082
1083        let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(vec![0, 1]));
1084        let schema = Arc::new(Schema::new(vec![
1085            Field::new("id", DataType::UInt32, false),
1086            blob_field("blob", true),
1087        ]));
1088
1089        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
1090        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
1091
1092        let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
1093        let dataset = Arc::new(
1094            Dataset::write(reader, &test_dir, Some(params))
1095                .await
1096                .unwrap(),
1097        );
1098
1099        let blobs = dataset
1100            .take_blobs_by_indices(&[0, 1], "blob")
1101            .await
1102            .unwrap();
1103
1104        assert_eq!(blobs.len(), 2);
1105        let first = blobs[0].read().await.unwrap();
1106        let second = blobs[1].read().await.unwrap();
1107        assert_eq!(first.as_ref(), b"hello");
1108        assert_eq!(second.as_ref(), b"world");
1109    }
1110
1111    fn build_schema_with_meta(threshold_opt: Option<usize>) -> Arc<Schema> {
1112        let mut blob_field_with_meta = blob_field("blob", true);
1113        if let Some(threshold) = threshold_opt {
1114            let mut metadata = blob_field_with_meta.metadata().clone();
1115            metadata.insert(
1116                BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY.to_string(),
1117                threshold.to_string(),
1118            );
1119            blob_field_with_meta = blob_field_with_meta.with_metadata(metadata);
1120        }
1121
1122        Arc::new(Schema::new(vec![
1123            Field::new("id", DataType::UInt32, false),
1124            blob_field_with_meta,
1125        ]))
1126    }
1127
1128    async fn write_then_get_blob_kinds(
1129        blob_sizes: Vec<usize>,
1130        threshold_opt: Option<usize>,
1131    ) -> Vec<BlobKind> {
1132        let test_dir = TempStrDir::default();
1133
1134        let mut blob_builder = BlobArrayBuilder::new(blob_sizes.len());
1135        for size in &blob_sizes {
1136            blob_builder.push_bytes(vec![0u8; *size]).unwrap();
1137        }
1138        let blob_array: arrow_array::ArrayRef = blob_builder.finish().unwrap();
1139
1140        let id_values: Vec<u32> = (0..blob_sizes.len() as u32).collect();
1141        let id_array: arrow_array::ArrayRef = Arc::new(UInt32Array::from(id_values));
1142
1143        let schema = build_schema_with_meta(threshold_opt);
1144
1145        let batch = RecordBatch::try_new(schema.clone(), vec![id_array, blob_array]).unwrap();
1146        let reader = RecordBatchIterator::new(vec![batch].into_iter().map(Ok), schema.clone());
1147
1148        let params = WriteParams::with_storage_version(LanceFileVersion::V2_2);
1149        let dataset = Arc::new(
1150            Dataset::write(reader, &test_dir, Some(params))
1151                .await
1152                .unwrap(),
1153        );
1154
1155        let indices: Vec<u64> = (0..blob_sizes.len() as u64).collect();
1156        let blobs = dataset
1157            .take_blobs_by_indices(&indices, "blob")
1158            .await
1159            .unwrap();
1160
1161        assert_eq!(blobs.len(), blob_sizes.len());
1162
1163        blobs.into_iter().map(|b| b.kind()).collect()
1164    }
1165
1166    #[tokio::test]
1167    async fn test_blob_v2_dedicated_threshold_ignores_non_positive_metadata() {
1168        let small_blob_len = super::DEDICATED_THRESHOLD / 2;
1169        let large_blob_len = super::DEDICATED_THRESHOLD + 1;
1170
1171        // Sanity check assumptions for this test
1172        assert!(small_blob_len > super::INLINE_MAX);
1173
1174        let cases = vec![(None, "no_metadata"), (Some(0), "zero_threshold")];
1175
1176        for (threshold_opt, label) in cases {
1177            let kinds =
1178                write_then_get_blob_kinds(vec![small_blob_len, large_blob_len], threshold_opt)
1179                    .await;
1180
1181            assert_eq!(kinds.len(), 2, "case: {label}");
1182            assert_eq!(kinds[0], BlobKind::Packed, "case: {label}");
1183            assert_eq!(kinds[1], BlobKind::Dedicated, "case: {label}");
1184        }
1185    }
1186
1187    #[tokio::test]
1188    async fn test_blob_v2_dedicated_threshold_respects_smaller_metadata() {
1189        let blob_len = super::DEDICATED_THRESHOLD / 2;
1190        let overridden_threshold = super::DEDICATED_THRESHOLD / 4;
1191
1192        assert!(blob_len > super::INLINE_MAX);
1193        assert!(overridden_threshold > 0);
1194        assert!(blob_len > overridden_threshold);
1195
1196        let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;
1197
1198        assert_eq!(kinds.len(), 1);
1199        assert_eq!(kinds[0], BlobKind::Packed);
1200    }
1201
1202    #[tokio::test]
1203    async fn test_blob_v2_dedicated_threshold_respects_larger_metadata() {
1204        let blob_len = super::DEDICATED_THRESHOLD + 1;
1205        let overridden_threshold = super::DEDICATED_THRESHOLD * 2;
1206
1207        assert!(blob_len > super::INLINE_MAX);
1208        assert!(blob_len < overridden_threshold);
1209
1210        let kinds = write_then_get_blob_kinds(vec![blob_len], Some(overridden_threshold)).await;
1211
1212        assert_eq!(kinds.len(), 1);
1213        assert_eq!(kinds[0], BlobKind::Packed);
1214    }
1215}