Skip to main content

lance_table/utils/
stream.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{BooleanArray, RecordBatch, RecordBatchOptions, UInt64Array, make_array};
7use arrow_buffer::NullBuffer;
8use futures::{
9    FutureExt, Stream, StreamExt,
10    future::BoxFuture,
11    stream::{BoxStream, FuturesOrdered},
12};
13use lance_arrow::RecordBatchExt;
14use lance_core::{
15    ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION_FIELD, ROW_ID, ROW_ID_FIELD,
16    ROW_LAST_UPDATED_AT_VERSION_FIELD, Result,
17    utils::{address::RowAddress, deletion::DeletionVector},
18};
19use lance_io::ReadBatchParams;
20use tracing::instrument;
21
22use crate::rowids::RowIdSequence;
23
24pub type ReadBatchFut = BoxFuture<'static, Result<RecordBatch>>;
25/// A task, emitted by a file reader, that will produce a batch (of the
26/// given size)
27pub struct ReadBatchTask {
28    pub task: ReadBatchFut,
29    pub num_rows: u32,
30}
31pub type ReadBatchTaskStream = BoxStream<'static, ReadBatchTask>;
32pub type ReadBatchFutStream = BoxStream<'static, ReadBatchFut>;
33
34struct MergeStream {
35    streams: Vec<ReadBatchTaskStream>,
36    next_batch: FuturesOrdered<ReadBatchFut>,
37    next_num_rows: u32,
38    index: usize,
39}
40
41impl MergeStream {
42    fn emit(&mut self) -> ReadBatchTask {
43        let mut iter = std::mem::take(&mut self.next_batch);
44        let task = async move {
45            let mut batch = iter.next().await.unwrap()?;
46            while let Some(next) = iter.next().await {
47                let next = next?;
48                batch = batch.merge(&next)?;
49            }
50            Ok(batch)
51        }
52        .boxed();
53        let num_rows = self.next_num_rows;
54        self.next_num_rows = 0;
55        ReadBatchTask { task, num_rows }
56    }
57}
58
59impl Stream for MergeStream {
60    type Item = ReadBatchTask;
61
62    fn poll_next(
63        mut self: std::pin::Pin<&mut Self>,
64        cx: &mut std::task::Context<'_>,
65    ) -> std::task::Poll<Option<Self::Item>> {
66        loop {
67            let index = self.index;
68            match self.streams[index].poll_next_unpin(cx) {
69                std::task::Poll::Ready(Some(batch_task)) => {
70                    if self.index == 0 {
71                        self.next_num_rows = batch_task.num_rows;
72                    } else {
73                        debug_assert_eq!(self.next_num_rows, batch_task.num_rows);
74                    }
75                    self.next_batch.push_back(batch_task.task);
76                    self.index += 1;
77                    if self.index == self.streams.len() {
78                        self.index = 0;
79                        let next_batch = self.emit();
80                        return std::task::Poll::Ready(Some(next_batch));
81                    }
82                }
83                std::task::Poll::Ready(None) => {
84                    return std::task::Poll::Ready(None);
85                }
86                std::task::Poll::Pending => {
87                    return std::task::Poll::Pending;
88                }
89            }
90        }
91    }
92}
93
94/// Given multiple streams of batch tasks, merge them into a single stream
95///
96/// This pulls one batch from each stream and then combines the columns from
97/// all of the batches into a single batch.  The order of the batches in the
98/// streams is maintained and the merged batch columns will be in order from
99/// first to last stream.
100///
101/// This stream ends as soon as any of the input streams ends (we do not
102/// verify that the other input streams are finished as well)
103///
104/// This will panic if any of the input streams return a batch with a different
105/// number of rows than the first stream.
106pub fn merge_streams(streams: Vec<ReadBatchTaskStream>) -> ReadBatchTaskStream {
107    MergeStream {
108        streams,
109        next_batch: FuturesOrdered::new(),
110        next_num_rows: 0,
111        index: 0,
112    }
113    .boxed()
114}
115
116/// Apply a mask to the batch, where rows are "deleted" by the _rowid column null.
117///
118/// This is used partly as a performance optimization (cheaper to null than to filter)
119/// but also because there are cases where we want to load the physical rows.  For example,
120/// we may be replacing a column based on some UDF and we want to provide a value for the
121/// deleted rows to ensure the fragments are aligned.
122fn apply_deletions_as_nulls(batch: RecordBatch, mask: &BooleanArray) -> Result<RecordBatch> {
123    // Transform mask into null buffer. Null means deleted, though note that
124    // null buffers are actually validity buffers, so True means not null
125    // and thus not deleted.
126    let mask_buffer = NullBuffer::new(mask.values().clone());
127
128    if mask_buffer.null_count() == 0 {
129        // No rows are deleted
130        return Ok(batch);
131    }
132
133    // For each column convert to data
134    let new_columns = batch
135        .schema()
136        .fields()
137        .iter()
138        .zip(batch.columns())
139        .map(|(field, col)| {
140            if field.name() == ROW_ID || field.name() == ROW_ADDR {
141                let col_data = col.to_data();
142                // If it already has a validity bitmap, then AND it with the mask.
143                // Otherwise, use the boolean buffer as the mask.
144                let null_buffer = NullBuffer::union(col_data.nulls(), Some(&mask_buffer));
145
146                Ok(col_data
147                    .into_builder()
148                    .null_bit_buffer(null_buffer.map(|b| b.buffer().clone()))
149                    .build()
150                    .map(make_array)?)
151            } else {
152                Ok(col.clone())
153            }
154        })
155        .collect::<Result<Vec<_>>>()?;
156
157    Ok(RecordBatch::try_new_with_options(
158        batch.schema(),
159        new_columns,
160        &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
161    )?)
162}
163
164/// Extract version values for a batch selection by binary-searching over
165/// precomputed RLE run offsets. Single-run fragments (the common case)
166/// take the O(1) fast path.
167fn version_values_for_selection(
168    sequence: &crate::rowids::version::RowDatasetVersionSequence,
169    params: &ReadBatchParams,
170    batch_offset: u32,
171    num_rows: u32,
172) -> Result<Vec<u64>> {
173    let selection = params
174        .slice(batch_offset as usize, num_rows as usize)
175        .unwrap()
176        .to_ranges()
177        .unwrap();
178
179    if sequence.runs.len() == 1 {
180        return Ok(vec![sequence.runs[0].version(); num_rows as usize]);
181    }
182
183    let mut versions = Vec::with_capacity(num_rows as usize);
184    let run_offsets: Vec<usize> = sequence
185        .runs
186        .iter()
187        .scan(0usize, |acc, run| {
188            let start = *acc;
189            *acc += run.len();
190            Some(start)
191        })
192        .collect();
193    let total_len: usize = sequence.runs.iter().map(|r| r.len()).sum();
194
195    for r in &selection {
196        for pos in r.start..r.end {
197            let pos = pos as usize;
198            if pos >= total_len {
199                return Err(lance_core::Error::internal(format!(
200                    "version column position {} out of range (total_len={})",
201                    pos, total_len
202                )));
203            }
204            let run_idx = match run_offsets.binary_search(&pos) {
205                Ok(idx) => idx,
206                Err(idx) => idx - 1,
207            };
208            versions.push(sequence.runs[run_idx].version());
209        }
210    }
211    Ok(versions)
212}
213
214/// Configuration needed to apply row ids and deletions to a batch
215#[derive(Debug)]
216pub struct RowIdAndDeletesConfig {
217    /// The row ids that were requested
218    pub params: ReadBatchParams,
219    /// Whether to include the row id column in the final batch
220    pub with_row_id: bool,
221    /// Whether to include the row address column in the final batch
222    pub with_row_addr: bool,
223    /// Whether to include the last updated at version column in the final batch
224    pub with_row_last_updated_at_version: bool,
225    /// Whether to include the created at version column in the final batch
226    pub with_row_created_at_version: bool,
227    /// An optional deletion vector to apply to the batch
228    pub deletion_vector: Option<Arc<DeletionVector>>,
229    /// An optional row id sequence to use for the row id column.
230    pub row_id_sequence: Option<Arc<RowIdSequence>>,
231    /// The last_updated_at version sequence
232    pub last_updated_at_sequence: Option<Arc<crate::rowids::version::RowDatasetVersionSequence>>,
233    /// The created_at version sequence
234    pub created_at_sequence: Option<Arc<crate::rowids::version::RowDatasetVersionSequence>>,
235    /// Whether to make deleted rows null instead of filtering them out
236    pub make_deletions_null: bool,
237    /// The total number of rows that will be loaded
238    ///
239    /// This is needed to convert ReadbatchParams::RangeTo into a valid range
240    pub total_num_rows: u32,
241}
242
243impl RowIdAndDeletesConfig {
244    fn has_system_cols(&self) -> bool {
245        self.with_row_id
246            || self.with_row_addr
247            || self.with_row_last_updated_at_version
248            || self.with_row_created_at_version
249    }
250}
251
252#[instrument(level = "debug", skip_all)]
253pub fn apply_row_id_and_deletes(
254    batch: RecordBatch,
255    batch_offset: u32,
256    fragment_id: u32,
257    config: &RowIdAndDeletesConfig,
258) -> Result<RecordBatch> {
259    let mut deletion_vector = config.deletion_vector.as_ref();
260    // Convert Some(NoDeletions) into None to simplify logic below
261    if let Some(deletion_vector_inner) = deletion_vector
262        && matches!(deletion_vector_inner.as_ref(), DeletionVector::NoDeletions)
263    {
264        deletion_vector = None;
265    }
266    let has_deletions = deletion_vector.is_some();
267    debug_assert!(batch.num_columns() > 0 || config.has_system_cols() || has_deletions);
268
269    // If row id sequence is None, then row id IS row address.
270    let should_fetch_row_addr = config.with_row_addr
271        || (config.with_row_id && config.row_id_sequence.is_none())
272        || has_deletions;
273
274    let num_rows = batch.num_rows() as u32;
275
276    let row_addrs =
277        if should_fetch_row_addr {
278            let _rowaddrs = tracing::span!(tracing::Level::DEBUG, "fetch_row_addrs").entered();
279            let mut row_addrs = Vec::with_capacity(num_rows as usize);
280            for offset_range in config
281                .params
282                .slice(batch_offset as usize, num_rows as usize)
283                .unwrap()
284                .iter_offset_ranges()?
285            {
286                row_addrs.extend(offset_range.map(|row_offset| {
287                    u64::from(RowAddress::new_from_parts(fragment_id, row_offset))
288                }));
289            }
290
291            Some(Arc::new(UInt64Array::from(row_addrs)))
292        } else {
293            None
294        };
295
296    let row_ids = if config.with_row_id {
297        let _rowids = tracing::span!(tracing::Level::DEBUG, "fetch_row_ids").entered();
298        if let Some(row_id_sequence) = &config.row_id_sequence {
299            let selection = config
300                .params
301                .slice(batch_offset as usize, num_rows as usize)
302                .unwrap()
303                .to_ranges()
304                .unwrap();
305            let row_ids = row_id_sequence
306                .select(
307                    selection
308                        .iter()
309                        .flat_map(|r| r.start as usize..r.end as usize),
310                )
311                .collect::<UInt64Array>();
312            Some(Arc::new(row_ids))
313        } else {
314            // If we don't have a row id sequence, can assume the row ids are
315            // the same as the row addresses.
316            row_addrs.clone()
317        }
318    } else {
319        None
320    };
321
322    let span = tracing::span!(tracing::Level::DEBUG, "apply_deletions");
323    let _enter = span.enter();
324    let deletion_mask = deletion_vector.and_then(|v| {
325        let row_addrs: &[u64] = row_addrs.as_ref().unwrap().values();
326        v.build_predicate(row_addrs.iter())
327    });
328
329    let batch = if config.with_row_id {
330        let row_id_arr = row_ids.unwrap();
331        batch.try_with_column(ROW_ID_FIELD.clone(), row_id_arr)?
332    } else {
333        batch
334    };
335
336    let batch = if config.with_row_addr {
337        let row_addr_arr = row_addrs.unwrap();
338        batch.try_with_column(ROW_ADDR_FIELD.clone(), row_addr_arr)?
339    } else {
340        batch
341    };
342
343    // Add version columns if requested
344    let batch = if config.with_row_last_updated_at_version || config.with_row_created_at_version {
345        let mut batch = batch;
346
347        if config.with_row_last_updated_at_version {
348            let version_arr = if let Some(sequence) = &config.last_updated_at_sequence {
349                Arc::new(UInt64Array::from(version_values_for_selection(
350                    sequence,
351                    &config.params,
352                    batch_offset,
353                    num_rows,
354                )?))
355            } else {
356                // Default to version 1 if sequence not provided
357                Arc::new(UInt64Array::from(vec![1u64; num_rows as usize]))
358            };
359            batch =
360                batch.try_with_column(ROW_LAST_UPDATED_AT_VERSION_FIELD.clone(), version_arr)?;
361        }
362
363        if config.with_row_created_at_version {
364            let version_arr = if let Some(sequence) = &config.created_at_sequence {
365                Arc::new(UInt64Array::from(version_values_for_selection(
366                    sequence,
367                    &config.params,
368                    batch_offset,
369                    num_rows,
370                )?))
371            } else {
372                // Default to version 1 if sequence not provided
373                Arc::new(UInt64Array::from(vec![1u64; num_rows as usize]))
374            };
375            batch = batch.try_with_column(ROW_CREATED_AT_VERSION_FIELD.clone(), version_arr)?;
376        }
377
378        batch
379    } else {
380        batch
381    };
382
383    match (deletion_mask, config.make_deletions_null) {
384        (None, _) => Ok(batch),
385        (Some(mask), false) => Ok(arrow::compute::filter_record_batch(&batch, &mask)?),
386        (Some(mask), true) => Ok(apply_deletions_as_nulls(batch, &mask)?),
387    }
388}
389
390/// Given a stream of batch tasks this function will add a row ids column (if requested)
391/// and also apply a deletions vector to the batch.
392///
393/// This converts from BatchTaskStream to BatchFutStream because, if we are applying a
394/// deletion vector, it is impossible to know how many output rows we will have.
395pub fn wrap_with_row_id_and_delete(
396    stream: ReadBatchTaskStream,
397    fragment_id: u32,
398    config: RowIdAndDeletesConfig,
399) -> ReadBatchFutStream {
400    let config = Arc::new(config);
401    let mut offset = 0;
402    stream
403        .map(move |batch_task| {
404            let config = config.clone();
405            let this_offset = offset;
406            let num_rows = batch_task.num_rows;
407            offset += num_rows;
408            batch_task
409                .task
410                .map(move |batch| {
411                    apply_row_id_and_deletes(batch?, this_offset, fragment_id, config.as_ref())
412                })
413                .boxed()
414        })
415        .boxed()
416}
417
418#[cfg(test)]
419mod tests {
420    use std::sync::Arc;
421
422    use arrow::{array::AsArray, datatypes::UInt64Type};
423    use arrow_array::{RecordBatch, UInt32Array, types::Int32Type};
424    use arrow_schema::ArrowError;
425    use futures::{FutureExt, StreamExt, TryStreamExt, stream::BoxStream};
426    use lance_core::{
427        ROW_ID,
428        utils::{address::RowAddress, deletion::DeletionVector},
429    };
430    use lance_datagen::{BatchCount, RowCount};
431    use lance_io::{ReadBatchParams, stream::arrow_stream_to_lance_stream};
432    use roaring::RoaringBitmap;
433
434    use crate::utils::stream::ReadBatchTask;
435
436    use super::RowIdAndDeletesConfig;
437
438    fn batch_task_stream(
439        datagen_stream: BoxStream<'static, std::result::Result<RecordBatch, ArrowError>>,
440    ) -> super::ReadBatchTaskStream {
441        arrow_stream_to_lance_stream(datagen_stream)
442            .map(|batch| ReadBatchTask {
443                num_rows: batch.as_ref().unwrap().num_rows() as u32,
444                task: std::future::ready(batch).boxed(),
445            })
446            .boxed()
447    }
448
449    #[tokio::test]
450    async fn test_basic_zip() {
451        let left = batch_task_stream(
452            lance_datagen::gen_batch()
453                .col("x", lance_datagen::array::step::<Int32Type>())
454                .into_reader_stream(RowCount::from(100), BatchCount::from(10))
455                .0,
456        );
457        let right = batch_task_stream(
458            lance_datagen::gen_batch()
459                .col("y", lance_datagen::array::step::<Int32Type>())
460                .into_reader_stream(RowCount::from(100), BatchCount::from(10))
461                .0,
462        );
463
464        let merged = super::merge_streams(vec![left, right])
465            .map(|batch_task| batch_task.task)
466            .buffered(1)
467            .try_collect::<Vec<_>>()
468            .await
469            .unwrap();
470
471        let expected = lance_datagen::gen_batch()
472            .col("x", lance_datagen::array::step::<Int32Type>())
473            .col("y", lance_datagen::array::step::<Int32Type>())
474            .into_reader_rows(RowCount::from(100), BatchCount::from(10))
475            .collect::<Result<Vec<_>, ArrowError>>()
476            .unwrap();
477        assert_eq!(merged, expected);
478    }
479
480    async fn check_row_id(params: ReadBatchParams, expected: impl IntoIterator<Item = u32>) {
481        let expected = Vec::from_iter(expected);
482
483        for has_columns in [false, true] {
484            for fragment_id in [0, 10] {
485                // 100 rows across 10 batches of 10 rows
486                let mut datagen = lance_datagen::gen_batch();
487                if has_columns {
488                    datagen = datagen.col("x", lance_datagen::array::rand::<Int32Type>());
489                }
490                let data = batch_task_stream(
491                    datagen
492                        .into_reader_stream(RowCount::from(10), BatchCount::from(10))
493                        .0,
494                );
495
496                let config = RowIdAndDeletesConfig {
497                    params: params.clone(),
498                    with_row_id: true,
499                    with_row_addr: false,
500                    with_row_last_updated_at_version: false,
501                    with_row_created_at_version: false,
502                    deletion_vector: None,
503                    row_id_sequence: None,
504                    last_updated_at_sequence: None,
505                    created_at_sequence: None,
506                    make_deletions_null: false,
507                    total_num_rows: 100,
508                };
509                let stream = super::wrap_with_row_id_and_delete(data, fragment_id, config);
510                let batches = stream.buffered(1).try_collect::<Vec<_>>().await.unwrap();
511
512                let mut offset = 0;
513                let expected = expected.clone();
514                for batch in batches {
515                    let actual_row_ids =
516                        batch[ROW_ID].as_primitive::<UInt64Type>().values().to_vec();
517                    let expected_row_ids = expected[offset..offset + 10]
518                        .iter()
519                        .map(|row_offset| {
520                            RowAddress::new_from_parts(fragment_id, *row_offset).into()
521                        })
522                        .collect::<Vec<u64>>();
523                    assert_eq!(actual_row_ids, expected_row_ids);
524                    offset += batch.num_rows();
525                }
526            }
527        }
528    }
529
530    #[tokio::test]
531    async fn test_row_id() {
532        let some_indices = (0..100).rev().collect::<Vec<u32>>();
533        let some_indices_arr = UInt32Array::from(some_indices.clone());
534        check_row_id(ReadBatchParams::RangeFull, 0..100).await;
535        check_row_id(ReadBatchParams::Indices(some_indices_arr), some_indices).await;
536        check_row_id(ReadBatchParams::Range(1000..1100), 1000..1100).await;
537        check_row_id(
538            ReadBatchParams::RangeFrom(std::ops::RangeFrom { start: 1000 }),
539            1000..1100,
540        )
541        .await;
542        check_row_id(
543            ReadBatchParams::RangeTo(std::ops::RangeTo { end: 1000 }),
544            0..100,
545        )
546        .await;
547    }
548
549    #[tokio::test]
550    async fn test_deletes() {
551        let no_deletes: Option<Arc<DeletionVector>> = None;
552        let no_deletes_2 = Some(Arc::new(DeletionVector::NoDeletions));
553        let delete_some_bitmap = Some(Arc::new(DeletionVector::Bitmap(RoaringBitmap::from_iter(
554            0..35,
555        ))));
556        let delete_some_set = Some(Arc::new(DeletionVector::Set((0..35).collect())));
557
558        for deletion_vector in [
559            no_deletes,
560            no_deletes_2,
561            delete_some_bitmap,
562            delete_some_set,
563        ] {
564            for has_columns in [false, true] {
565                for with_row_id in [false, true] {
566                    for make_deletions_null in [false, true] {
567                        for frag_id in [0, 1] {
568                            let has_deletions = if let Some(dv) = &deletion_vector {
569                                !matches!(dv.as_ref(), DeletionVector::NoDeletions)
570                            } else {
571                                false
572                            };
573                            if !has_columns && !has_deletions && !with_row_id {
574                                // This is an invalid case and should be prevented upstream,
575                                // no meaningful work is being done!
576                                continue;
577                            }
578                            if make_deletions_null && !with_row_id {
579                                // This is an invalid case and should be prevented upstream
580                                // we cannot make the row_id column null if it isn't present
581                                continue;
582                            }
583
584                            let mut datagen = lance_datagen::gen_batch();
585                            if has_columns {
586                                datagen =
587                                    datagen.col("x", lance_datagen::array::rand::<Int32Type>());
588                            }
589                            // 100 rows across 10 batches of 10 rows
590                            let data = batch_task_stream(
591                                datagen
592                                    .into_reader_stream(RowCount::from(10), BatchCount::from(10))
593                                    .0,
594                            );
595
596                            let config = RowIdAndDeletesConfig {
597                                params: ReadBatchParams::RangeFull,
598                                with_row_id,
599                                with_row_addr: false,
600                                with_row_last_updated_at_version: false,
601                                with_row_created_at_version: false,
602                                deletion_vector: deletion_vector.clone(),
603                                row_id_sequence: None,
604                                last_updated_at_sequence: None,
605                                created_at_sequence: None,
606                                make_deletions_null,
607                                total_num_rows: 100,
608                            };
609                            let stream = super::wrap_with_row_id_and_delete(data, frag_id, config);
610                            let batches = stream
611                                .buffered(1)
612                                .filter_map(|batch| {
613                                    std::future::ready(
614                                        batch
615                                            .map(|batch| {
616                                                if batch.num_rows() == 0 {
617                                                    None
618                                                } else {
619                                                    Some(batch)
620                                                }
621                                            })
622                                            .transpose(),
623                                    )
624                                })
625                                .try_collect::<Vec<_>>()
626                                .await
627                                .unwrap();
628
629                            let total_num_rows =
630                                batches.iter().map(|b| b.num_rows()).sum::<usize>();
631                            let total_num_nulls = if make_deletions_null {
632                                batches
633                                    .iter()
634                                    .map(|b| b[ROW_ID].null_count())
635                                    .sum::<usize>()
636                            } else {
637                                0
638                            };
639                            let total_actually_deleted = total_num_nulls + (100 - total_num_rows);
640
641                            let expected_deletions = match &deletion_vector {
642                                None => 0,
643                                Some(deletion_vector) => match deletion_vector.as_ref() {
644                                    DeletionVector::NoDeletions => 0,
645                                    DeletionVector::Bitmap(b) => b.len() as usize,
646                                    DeletionVector::Set(s) => s.len(),
647                                },
648                            };
649                            assert_eq!(total_actually_deleted, expected_deletions);
650                            if expected_deletions > 0 && with_row_id {
651                                if make_deletions_null {
652                                    // If we make deletions null we get 3 batches of all-null and then
653                                    // a batch of half-null
654                                    assert_eq!(
655                                        batches[3][ROW_ID].as_primitive::<UInt64Type>().value(0),
656                                        u64::from(RowAddress::new_from_parts(frag_id, 30))
657                                    );
658                                    assert_eq!(batches[3][ROW_ID].null_count(), 5);
659                                } else {
660                                    // If we materialize deletions the first row will be 35
661                                    assert_eq!(
662                                        batches[0][ROW_ID].as_primitive::<UInt64Type>().value(0),
663                                        u64::from(RowAddress::new_from_parts(frag_id, 35))
664                                    );
665                                }
666                            }
667                            if !with_row_id {
668                                assert!(batches[0].column_by_name(ROW_ID).is_none());
669                            }
670                        }
671                    }
672                }
673            }
674        }
675    }
676
677    #[tokio::test]
678    async fn test_version_column_with_deletions() {
679        use crate::rowids::segment::U64Segment;
680        use crate::rowids::version::{RowDatasetVersionRun, RowDatasetVersionSequence};
681
682        let seq = Arc::new(RowDatasetVersionSequence {
683            runs: vec![RowDatasetVersionRun {
684                span: U64Segment::Range(0..100),
685                version: 42,
686            }],
687        });
688
689        let data = batch_task_stream(
690            lance_datagen::gen_batch()
691                .col("x", lance_datagen::array::rand::<Int32Type>())
692                .into_reader_stream(RowCount::from(10), BatchCount::from(10))
693                .0,
694        );
695
696        let config = RowIdAndDeletesConfig {
697            params: ReadBatchParams::RangeFull,
698            with_row_id: true,
699            with_row_addr: false,
700            with_row_last_updated_at_version: false,
701            with_row_created_at_version: true,
702            deletion_vector: Some(Arc::new(DeletionVector::Bitmap(RoaringBitmap::from_iter(
703                0..35,
704            )))),
705            row_id_sequence: None,
706            last_updated_at_sequence: None,
707            created_at_sequence: Some(seq),
708            make_deletions_null: false,
709            total_num_rows: 100,
710        };
711        let stream = super::wrap_with_row_id_and_delete(data, 0, config);
712        let batches: Vec<_> = stream
713            .buffered(1)
714            .try_filter(|b| std::future::ready(b.num_rows() > 0))
715            .try_collect()
716            .await
717            .unwrap();
718
719        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
720        assert_eq!(total_rows, 65);
721
722        for batch in &batches {
723            let versions = batch
724                .column_by_name("_row_created_at_version")
725                .unwrap()
726                .as_primitive::<UInt64Type>()
727                .values();
728            assert!(versions.iter().all(|&v| v == 42));
729        }
730    }
731
732    #[tokio::test]
733    async fn test_version_column_multi_run() {
734        use crate::rowids::segment::U64Segment;
735        use crate::rowids::version::{RowDatasetVersionRun, RowDatasetVersionSequence};
736
737        // 3 runs: 0..40 v1, 40..70 v2, 70..100 v3
738        let seq = Arc::new(RowDatasetVersionSequence {
739            runs: vec![
740                RowDatasetVersionRun {
741                    span: U64Segment::Range(0..40),
742                    version: 1,
743                },
744                RowDatasetVersionRun {
745                    span: U64Segment::Range(40..70),
746                    version: 2,
747                },
748                RowDatasetVersionRun {
749                    span: U64Segment::Range(70..100),
750                    version: 3,
751                },
752            ],
753        });
754
755        // Delete 0..20 and 60..80 (spans run boundary).
756        // Survivors: 20..40 (v1), 40..60 (v2), 80..100 (v3) = 60 rows
757        let mut deletions = RoaringBitmap::from_iter(0..20);
758        deletions.extend(60..80);
759
760        let data = batch_task_stream(
761            lance_datagen::gen_batch()
762                .col("x", lance_datagen::array::rand::<Int32Type>())
763                .into_reader_stream(RowCount::from(10), BatchCount::from(10))
764                .0,
765        );
766
767        let config = RowIdAndDeletesConfig {
768            params: ReadBatchParams::RangeFull,
769            with_row_id: true,
770            with_row_addr: false,
771            with_row_last_updated_at_version: false,
772            with_row_created_at_version: true,
773            deletion_vector: Some(Arc::new(DeletionVector::Bitmap(deletions))),
774            row_id_sequence: None,
775            last_updated_at_sequence: None,
776            created_at_sequence: Some(seq),
777            make_deletions_null: false,
778            total_num_rows: 100,
779        };
780        let stream = super::wrap_with_row_id_and_delete(data, 0, config);
781        let batches: Vec<_> = stream
782            .buffered(1)
783            .try_filter(|b| std::future::ready(b.num_rows() > 0))
784            .try_collect()
785            .await
786            .unwrap();
787
788        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
789        assert_eq!(total_rows, 60);
790
791        let all_versions: Vec<u64> = batches
792            .iter()
793            .flat_map(|b| {
794                b.column_by_name("_row_created_at_version")
795                    .unwrap()
796                    .as_primitive::<UInt64Type>()
797                    .values()
798                    .to_vec()
799            })
800            .collect();
801
802        assert!(all_versions[..20].iter().all(|&v| v == 1));
803        assert!(all_versions[20..40].iter().all(|&v| v == 2));
804        assert!(all_versions[40..60].iter().all(|&v| v == 3));
805    }
806}