1use std::sync::Arc;
5
6use arrow_array::{BooleanArray, RecordBatch, RecordBatchOptions, UInt64Array, make_array};
7use arrow_buffer::NullBuffer;
8use futures::{
9 FutureExt, Stream, StreamExt,
10 future::BoxFuture,
11 stream::{BoxStream, FuturesOrdered},
12};
13use lance_arrow::RecordBatchExt;
14use lance_core::{
15 ROW_ADDR, ROW_ADDR_FIELD, ROW_CREATED_AT_VERSION_FIELD, ROW_ID, ROW_ID_FIELD,
16 ROW_LAST_UPDATED_AT_VERSION_FIELD, Result,
17 utils::{address::RowAddress, deletion::DeletionVector},
18};
19use lance_io::ReadBatchParams;
20use tracing::instrument;
21
22use crate::rowids::RowIdSequence;
23
24pub type ReadBatchFut = BoxFuture<'static, Result<RecordBatch>>;
25pub struct ReadBatchTask {
28 pub task: ReadBatchFut,
29 pub num_rows: u32,
30}
31pub type ReadBatchTaskStream = BoxStream<'static, ReadBatchTask>;
32pub type ReadBatchFutStream = BoxStream<'static, ReadBatchFut>;
33
34struct MergeStream {
35 streams: Vec<ReadBatchTaskStream>,
36 next_batch: FuturesOrdered<ReadBatchFut>,
37 next_num_rows: u32,
38 index: usize,
39}
40
41impl MergeStream {
42 fn emit(&mut self) -> ReadBatchTask {
43 let mut iter = std::mem::take(&mut self.next_batch);
44 let task = async move {
45 let mut batch = iter.next().await.unwrap()?;
46 while let Some(next) = iter.next().await {
47 let next = next?;
48 batch = batch.merge(&next)?;
49 }
50 Ok(batch)
51 }
52 .boxed();
53 let num_rows = self.next_num_rows;
54 self.next_num_rows = 0;
55 ReadBatchTask { task, num_rows }
56 }
57}
58
59impl Stream for MergeStream {
60 type Item = ReadBatchTask;
61
62 fn poll_next(
63 mut self: std::pin::Pin<&mut Self>,
64 cx: &mut std::task::Context<'_>,
65 ) -> std::task::Poll<Option<Self::Item>> {
66 loop {
67 let index = self.index;
68 match self.streams[index].poll_next_unpin(cx) {
69 std::task::Poll::Ready(Some(batch_task)) => {
70 if self.index == 0 {
71 self.next_num_rows = batch_task.num_rows;
72 } else {
73 debug_assert_eq!(self.next_num_rows, batch_task.num_rows);
74 }
75 self.next_batch.push_back(batch_task.task);
76 self.index += 1;
77 if self.index == self.streams.len() {
78 self.index = 0;
79 let next_batch = self.emit();
80 return std::task::Poll::Ready(Some(next_batch));
81 }
82 }
83 std::task::Poll::Ready(None) => {
84 return std::task::Poll::Ready(None);
85 }
86 std::task::Poll::Pending => {
87 return std::task::Poll::Pending;
88 }
89 }
90 }
91 }
92}
93
94pub fn merge_streams(streams: Vec<ReadBatchTaskStream>) -> ReadBatchTaskStream {
107 MergeStream {
108 streams,
109 next_batch: FuturesOrdered::new(),
110 next_num_rows: 0,
111 index: 0,
112 }
113 .boxed()
114}
115
116fn apply_deletions_as_nulls(batch: RecordBatch, mask: &BooleanArray) -> Result<RecordBatch> {
123 let mask_buffer = NullBuffer::new(mask.values().clone());
127
128 if mask_buffer.null_count() == 0 {
129 return Ok(batch);
131 }
132
133 let new_columns = batch
135 .schema()
136 .fields()
137 .iter()
138 .zip(batch.columns())
139 .map(|(field, col)| {
140 if field.name() == ROW_ID || field.name() == ROW_ADDR {
141 let col_data = col.to_data();
142 let null_buffer = NullBuffer::union(col_data.nulls(), Some(&mask_buffer));
145
146 Ok(col_data
147 .into_builder()
148 .null_bit_buffer(null_buffer.map(|b| b.buffer().clone()))
149 .build()
150 .map(make_array)?)
151 } else {
152 Ok(col.clone())
153 }
154 })
155 .collect::<Result<Vec<_>>>()?;
156
157 Ok(RecordBatch::try_new_with_options(
158 batch.schema(),
159 new_columns,
160 &RecordBatchOptions::new().with_row_count(Some(batch.num_rows())),
161 )?)
162}
163
164fn version_values_for_selection(
168 sequence: &crate::rowids::version::RowDatasetVersionSequence,
169 params: &ReadBatchParams,
170 batch_offset: u32,
171 num_rows: u32,
172) -> Result<Vec<u64>> {
173 let selection = params
174 .slice(batch_offset as usize, num_rows as usize)
175 .unwrap()
176 .to_ranges()
177 .unwrap();
178
179 if sequence.runs.len() == 1 {
180 return Ok(vec![sequence.runs[0].version(); num_rows as usize]);
181 }
182
183 let mut versions = Vec::with_capacity(num_rows as usize);
184 let run_offsets: Vec<usize> = sequence
185 .runs
186 .iter()
187 .scan(0usize, |acc, run| {
188 let start = *acc;
189 *acc += run.len();
190 Some(start)
191 })
192 .collect();
193 let total_len: usize = sequence.runs.iter().map(|r| r.len()).sum();
194
195 for r in &selection {
196 for pos in r.start..r.end {
197 let pos = pos as usize;
198 if pos >= total_len {
199 return Err(lance_core::Error::internal(format!(
200 "version column position {} out of range (total_len={})",
201 pos, total_len
202 )));
203 }
204 let run_idx = match run_offsets.binary_search(&pos) {
205 Ok(idx) => idx,
206 Err(idx) => idx - 1,
207 };
208 versions.push(sequence.runs[run_idx].version());
209 }
210 }
211 Ok(versions)
212}
213
214#[derive(Debug)]
216pub struct RowIdAndDeletesConfig {
217 pub params: ReadBatchParams,
219 pub with_row_id: bool,
221 pub with_row_addr: bool,
223 pub with_row_last_updated_at_version: bool,
225 pub with_row_created_at_version: bool,
227 pub deletion_vector: Option<Arc<DeletionVector>>,
229 pub row_id_sequence: Option<Arc<RowIdSequence>>,
231 pub last_updated_at_sequence: Option<Arc<crate::rowids::version::RowDatasetVersionSequence>>,
233 pub created_at_sequence: Option<Arc<crate::rowids::version::RowDatasetVersionSequence>>,
235 pub make_deletions_null: bool,
237 pub total_num_rows: u32,
241}
242
243impl RowIdAndDeletesConfig {
244 fn has_system_cols(&self) -> bool {
245 self.with_row_id
246 || self.with_row_addr
247 || self.with_row_last_updated_at_version
248 || self.with_row_created_at_version
249 }
250}
251
252#[instrument(level = "debug", skip_all)]
253pub fn apply_row_id_and_deletes(
254 batch: RecordBatch,
255 batch_offset: u32,
256 fragment_id: u32,
257 config: &RowIdAndDeletesConfig,
258) -> Result<RecordBatch> {
259 let mut deletion_vector = config.deletion_vector.as_ref();
260 if let Some(deletion_vector_inner) = deletion_vector
262 && matches!(deletion_vector_inner.as_ref(), DeletionVector::NoDeletions)
263 {
264 deletion_vector = None;
265 }
266 let has_deletions = deletion_vector.is_some();
267 debug_assert!(batch.num_columns() > 0 || config.has_system_cols() || has_deletions);
268
269 let should_fetch_row_addr = config.with_row_addr
271 || (config.with_row_id && config.row_id_sequence.is_none())
272 || has_deletions;
273
274 let num_rows = batch.num_rows() as u32;
275
276 let row_addrs =
277 if should_fetch_row_addr {
278 let _rowaddrs = tracing::span!(tracing::Level::DEBUG, "fetch_row_addrs").entered();
279 let mut row_addrs = Vec::with_capacity(num_rows as usize);
280 for offset_range in config
281 .params
282 .slice(batch_offset as usize, num_rows as usize)
283 .unwrap()
284 .iter_offset_ranges()?
285 {
286 row_addrs.extend(offset_range.map(|row_offset| {
287 u64::from(RowAddress::new_from_parts(fragment_id, row_offset))
288 }));
289 }
290
291 Some(Arc::new(UInt64Array::from(row_addrs)))
292 } else {
293 None
294 };
295
296 let row_ids = if config.with_row_id {
297 let _rowids = tracing::span!(tracing::Level::DEBUG, "fetch_row_ids").entered();
298 if let Some(row_id_sequence) = &config.row_id_sequence {
299 let selection = config
300 .params
301 .slice(batch_offset as usize, num_rows as usize)
302 .unwrap()
303 .to_ranges()
304 .unwrap();
305 let row_ids = row_id_sequence
306 .select(
307 selection
308 .iter()
309 .flat_map(|r| r.start as usize..r.end as usize),
310 )
311 .collect::<UInt64Array>();
312 Some(Arc::new(row_ids))
313 } else {
314 row_addrs.clone()
317 }
318 } else {
319 None
320 };
321
322 let span = tracing::span!(tracing::Level::DEBUG, "apply_deletions");
323 let _enter = span.enter();
324 let deletion_mask = deletion_vector.and_then(|v| {
325 let row_addrs: &[u64] = row_addrs.as_ref().unwrap().values();
326 v.build_predicate(row_addrs.iter())
327 });
328
329 let batch = if config.with_row_id {
330 let row_id_arr = row_ids.unwrap();
331 batch.try_with_column(ROW_ID_FIELD.clone(), row_id_arr)?
332 } else {
333 batch
334 };
335
336 let batch = if config.with_row_addr {
337 let row_addr_arr = row_addrs.unwrap();
338 batch.try_with_column(ROW_ADDR_FIELD.clone(), row_addr_arr)?
339 } else {
340 batch
341 };
342
343 let batch = if config.with_row_last_updated_at_version || config.with_row_created_at_version {
345 let mut batch = batch;
346
347 if config.with_row_last_updated_at_version {
348 let version_arr = if let Some(sequence) = &config.last_updated_at_sequence {
349 Arc::new(UInt64Array::from(version_values_for_selection(
350 sequence,
351 &config.params,
352 batch_offset,
353 num_rows,
354 )?))
355 } else {
356 Arc::new(UInt64Array::from(vec![1u64; num_rows as usize]))
358 };
359 batch =
360 batch.try_with_column(ROW_LAST_UPDATED_AT_VERSION_FIELD.clone(), version_arr)?;
361 }
362
363 if config.with_row_created_at_version {
364 let version_arr = if let Some(sequence) = &config.created_at_sequence {
365 Arc::new(UInt64Array::from(version_values_for_selection(
366 sequence,
367 &config.params,
368 batch_offset,
369 num_rows,
370 )?))
371 } else {
372 Arc::new(UInt64Array::from(vec![1u64; num_rows as usize]))
374 };
375 batch = batch.try_with_column(ROW_CREATED_AT_VERSION_FIELD.clone(), version_arr)?;
376 }
377
378 batch
379 } else {
380 batch
381 };
382
383 match (deletion_mask, config.make_deletions_null) {
384 (None, _) => Ok(batch),
385 (Some(mask), false) => Ok(arrow::compute::filter_record_batch(&batch, &mask)?),
386 (Some(mask), true) => Ok(apply_deletions_as_nulls(batch, &mask)?),
387 }
388}
389
390pub fn wrap_with_row_id_and_delete(
396 stream: ReadBatchTaskStream,
397 fragment_id: u32,
398 config: RowIdAndDeletesConfig,
399) -> ReadBatchFutStream {
400 let config = Arc::new(config);
401 let mut offset = 0;
402 stream
403 .map(move |batch_task| {
404 let config = config.clone();
405 let this_offset = offset;
406 let num_rows = batch_task.num_rows;
407 offset += num_rows;
408 batch_task
409 .task
410 .map(move |batch| {
411 apply_row_id_and_deletes(batch?, this_offset, fragment_id, config.as_ref())
412 })
413 .boxed()
414 })
415 .boxed()
416}
417
418#[cfg(test)]
419mod tests {
420 use std::sync::Arc;
421
422 use arrow::{array::AsArray, datatypes::UInt64Type};
423 use arrow_array::{RecordBatch, UInt32Array, types::Int32Type};
424 use arrow_schema::ArrowError;
425 use futures::{FutureExt, StreamExt, TryStreamExt, stream::BoxStream};
426 use lance_core::{
427 ROW_ID,
428 utils::{address::RowAddress, deletion::DeletionVector},
429 };
430 use lance_datagen::{BatchCount, RowCount};
431 use lance_io::{ReadBatchParams, stream::arrow_stream_to_lance_stream};
432 use roaring::RoaringBitmap;
433
434 use crate::utils::stream::ReadBatchTask;
435
436 use super::RowIdAndDeletesConfig;
437
438 fn batch_task_stream(
439 datagen_stream: BoxStream<'static, std::result::Result<RecordBatch, ArrowError>>,
440 ) -> super::ReadBatchTaskStream {
441 arrow_stream_to_lance_stream(datagen_stream)
442 .map(|batch| ReadBatchTask {
443 num_rows: batch.as_ref().unwrap().num_rows() as u32,
444 task: std::future::ready(batch).boxed(),
445 })
446 .boxed()
447 }
448
449 #[tokio::test]
450 async fn test_basic_zip() {
451 let left = batch_task_stream(
452 lance_datagen::gen_batch()
453 .col("x", lance_datagen::array::step::<Int32Type>())
454 .into_reader_stream(RowCount::from(100), BatchCount::from(10))
455 .0,
456 );
457 let right = batch_task_stream(
458 lance_datagen::gen_batch()
459 .col("y", lance_datagen::array::step::<Int32Type>())
460 .into_reader_stream(RowCount::from(100), BatchCount::from(10))
461 .0,
462 );
463
464 let merged = super::merge_streams(vec![left, right])
465 .map(|batch_task| batch_task.task)
466 .buffered(1)
467 .try_collect::<Vec<_>>()
468 .await
469 .unwrap();
470
471 let expected = lance_datagen::gen_batch()
472 .col("x", lance_datagen::array::step::<Int32Type>())
473 .col("y", lance_datagen::array::step::<Int32Type>())
474 .into_reader_rows(RowCount::from(100), BatchCount::from(10))
475 .collect::<Result<Vec<_>, ArrowError>>()
476 .unwrap();
477 assert_eq!(merged, expected);
478 }
479
480 async fn check_row_id(params: ReadBatchParams, expected: impl IntoIterator<Item = u32>) {
481 let expected = Vec::from_iter(expected);
482
483 for has_columns in [false, true] {
484 for fragment_id in [0, 10] {
485 let mut datagen = lance_datagen::gen_batch();
487 if has_columns {
488 datagen = datagen.col("x", lance_datagen::array::rand::<Int32Type>());
489 }
490 let data = batch_task_stream(
491 datagen
492 .into_reader_stream(RowCount::from(10), BatchCount::from(10))
493 .0,
494 );
495
496 let config = RowIdAndDeletesConfig {
497 params: params.clone(),
498 with_row_id: true,
499 with_row_addr: false,
500 with_row_last_updated_at_version: false,
501 with_row_created_at_version: false,
502 deletion_vector: None,
503 row_id_sequence: None,
504 last_updated_at_sequence: None,
505 created_at_sequence: None,
506 make_deletions_null: false,
507 total_num_rows: 100,
508 };
509 let stream = super::wrap_with_row_id_and_delete(data, fragment_id, config);
510 let batches = stream.buffered(1).try_collect::<Vec<_>>().await.unwrap();
511
512 let mut offset = 0;
513 let expected = expected.clone();
514 for batch in batches {
515 let actual_row_ids =
516 batch[ROW_ID].as_primitive::<UInt64Type>().values().to_vec();
517 let expected_row_ids = expected[offset..offset + 10]
518 .iter()
519 .map(|row_offset| {
520 RowAddress::new_from_parts(fragment_id, *row_offset).into()
521 })
522 .collect::<Vec<u64>>();
523 assert_eq!(actual_row_ids, expected_row_ids);
524 offset += batch.num_rows();
525 }
526 }
527 }
528 }
529
530 #[tokio::test]
531 async fn test_row_id() {
532 let some_indices = (0..100).rev().collect::<Vec<u32>>();
533 let some_indices_arr = UInt32Array::from(some_indices.clone());
534 check_row_id(ReadBatchParams::RangeFull, 0..100).await;
535 check_row_id(ReadBatchParams::Indices(some_indices_arr), some_indices).await;
536 check_row_id(ReadBatchParams::Range(1000..1100), 1000..1100).await;
537 check_row_id(
538 ReadBatchParams::RangeFrom(std::ops::RangeFrom { start: 1000 }),
539 1000..1100,
540 )
541 .await;
542 check_row_id(
543 ReadBatchParams::RangeTo(std::ops::RangeTo { end: 1000 }),
544 0..100,
545 )
546 .await;
547 }
548
549 #[tokio::test]
550 async fn test_deletes() {
551 let no_deletes: Option<Arc<DeletionVector>> = None;
552 let no_deletes_2 = Some(Arc::new(DeletionVector::NoDeletions));
553 let delete_some_bitmap = Some(Arc::new(DeletionVector::Bitmap(RoaringBitmap::from_iter(
554 0..35,
555 ))));
556 let delete_some_set = Some(Arc::new(DeletionVector::Set((0..35).collect())));
557
558 for deletion_vector in [
559 no_deletes,
560 no_deletes_2,
561 delete_some_bitmap,
562 delete_some_set,
563 ] {
564 for has_columns in [false, true] {
565 for with_row_id in [false, true] {
566 for make_deletions_null in [false, true] {
567 for frag_id in [0, 1] {
568 let has_deletions = if let Some(dv) = &deletion_vector {
569 !matches!(dv.as_ref(), DeletionVector::NoDeletions)
570 } else {
571 false
572 };
573 if !has_columns && !has_deletions && !with_row_id {
574 continue;
577 }
578 if make_deletions_null && !with_row_id {
579 continue;
582 }
583
584 let mut datagen = lance_datagen::gen_batch();
585 if has_columns {
586 datagen =
587 datagen.col("x", lance_datagen::array::rand::<Int32Type>());
588 }
589 let data = batch_task_stream(
591 datagen
592 .into_reader_stream(RowCount::from(10), BatchCount::from(10))
593 .0,
594 );
595
596 let config = RowIdAndDeletesConfig {
597 params: ReadBatchParams::RangeFull,
598 with_row_id,
599 with_row_addr: false,
600 with_row_last_updated_at_version: false,
601 with_row_created_at_version: false,
602 deletion_vector: deletion_vector.clone(),
603 row_id_sequence: None,
604 last_updated_at_sequence: None,
605 created_at_sequence: None,
606 make_deletions_null,
607 total_num_rows: 100,
608 };
609 let stream = super::wrap_with_row_id_and_delete(data, frag_id, config);
610 let batches = stream
611 .buffered(1)
612 .filter_map(|batch| {
613 std::future::ready(
614 batch
615 .map(|batch| {
616 if batch.num_rows() == 0 {
617 None
618 } else {
619 Some(batch)
620 }
621 })
622 .transpose(),
623 )
624 })
625 .try_collect::<Vec<_>>()
626 .await
627 .unwrap();
628
629 let total_num_rows =
630 batches.iter().map(|b| b.num_rows()).sum::<usize>();
631 let total_num_nulls = if make_deletions_null {
632 batches
633 .iter()
634 .map(|b| b[ROW_ID].null_count())
635 .sum::<usize>()
636 } else {
637 0
638 };
639 let total_actually_deleted = total_num_nulls + (100 - total_num_rows);
640
641 let expected_deletions = match &deletion_vector {
642 None => 0,
643 Some(deletion_vector) => match deletion_vector.as_ref() {
644 DeletionVector::NoDeletions => 0,
645 DeletionVector::Bitmap(b) => b.len() as usize,
646 DeletionVector::Set(s) => s.len(),
647 },
648 };
649 assert_eq!(total_actually_deleted, expected_deletions);
650 if expected_deletions > 0 && with_row_id {
651 if make_deletions_null {
652 assert_eq!(
655 batches[3][ROW_ID].as_primitive::<UInt64Type>().value(0),
656 u64::from(RowAddress::new_from_parts(frag_id, 30))
657 );
658 assert_eq!(batches[3][ROW_ID].null_count(), 5);
659 } else {
660 assert_eq!(
662 batches[0][ROW_ID].as_primitive::<UInt64Type>().value(0),
663 u64::from(RowAddress::new_from_parts(frag_id, 35))
664 );
665 }
666 }
667 if !with_row_id {
668 assert!(batches[0].column_by_name(ROW_ID).is_none());
669 }
670 }
671 }
672 }
673 }
674 }
675 }
676
677 #[tokio::test]
678 async fn test_version_column_with_deletions() {
679 use crate::rowids::segment::U64Segment;
680 use crate::rowids::version::{RowDatasetVersionRun, RowDatasetVersionSequence};
681
682 let seq = Arc::new(RowDatasetVersionSequence {
683 runs: vec![RowDatasetVersionRun {
684 span: U64Segment::Range(0..100),
685 version: 42,
686 }],
687 });
688
689 let data = batch_task_stream(
690 lance_datagen::gen_batch()
691 .col("x", lance_datagen::array::rand::<Int32Type>())
692 .into_reader_stream(RowCount::from(10), BatchCount::from(10))
693 .0,
694 );
695
696 let config = RowIdAndDeletesConfig {
697 params: ReadBatchParams::RangeFull,
698 with_row_id: true,
699 with_row_addr: false,
700 with_row_last_updated_at_version: false,
701 with_row_created_at_version: true,
702 deletion_vector: Some(Arc::new(DeletionVector::Bitmap(RoaringBitmap::from_iter(
703 0..35,
704 )))),
705 row_id_sequence: None,
706 last_updated_at_sequence: None,
707 created_at_sequence: Some(seq),
708 make_deletions_null: false,
709 total_num_rows: 100,
710 };
711 let stream = super::wrap_with_row_id_and_delete(data, 0, config);
712 let batches: Vec<_> = stream
713 .buffered(1)
714 .try_filter(|b| std::future::ready(b.num_rows() > 0))
715 .try_collect()
716 .await
717 .unwrap();
718
719 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
720 assert_eq!(total_rows, 65);
721
722 for batch in &batches {
723 let versions = batch
724 .column_by_name("_row_created_at_version")
725 .unwrap()
726 .as_primitive::<UInt64Type>()
727 .values();
728 assert!(versions.iter().all(|&v| v == 42));
729 }
730 }
731
732 #[tokio::test]
733 async fn test_version_column_multi_run() {
734 use crate::rowids::segment::U64Segment;
735 use crate::rowids::version::{RowDatasetVersionRun, RowDatasetVersionSequence};
736
737 let seq = Arc::new(RowDatasetVersionSequence {
739 runs: vec![
740 RowDatasetVersionRun {
741 span: U64Segment::Range(0..40),
742 version: 1,
743 },
744 RowDatasetVersionRun {
745 span: U64Segment::Range(40..70),
746 version: 2,
747 },
748 RowDatasetVersionRun {
749 span: U64Segment::Range(70..100),
750 version: 3,
751 },
752 ],
753 });
754
755 let mut deletions = RoaringBitmap::from_iter(0..20);
758 deletions.extend(60..80);
759
760 let data = batch_task_stream(
761 lance_datagen::gen_batch()
762 .col("x", lance_datagen::array::rand::<Int32Type>())
763 .into_reader_stream(RowCount::from(10), BatchCount::from(10))
764 .0,
765 );
766
767 let config = RowIdAndDeletesConfig {
768 params: ReadBatchParams::RangeFull,
769 with_row_id: true,
770 with_row_addr: false,
771 with_row_last_updated_at_version: false,
772 with_row_created_at_version: true,
773 deletion_vector: Some(Arc::new(DeletionVector::Bitmap(deletions))),
774 row_id_sequence: None,
775 last_updated_at_sequence: None,
776 created_at_sequence: Some(seq),
777 make_deletions_null: false,
778 total_num_rows: 100,
779 };
780 let stream = super::wrap_with_row_id_and_delete(data, 0, config);
781 let batches: Vec<_> = stream
782 .buffered(1)
783 .try_filter(|b| std::future::ready(b.num_rows() > 0))
784 .try_collect()
785 .await
786 .unwrap();
787
788 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
789 assert_eq!(total_rows, 60);
790
791 let all_versions: Vec<u64> = batches
792 .iter()
793 .flat_map(|b| {
794 b.column_by_name("_row_created_at_version")
795 .unwrap()
796 .as_primitive::<UInt64Type>()
797 .values()
798 .to_vec()
799 })
800 .collect();
801
802 assert!(all_versions[..20].iter().all(|&v| v == 1));
803 assert!(all_versions[20..40].iter().all(|&v| v == 2));
804 assert!(all_versions[40..60].iter().all(|&v| v == 3));
805 }
806}