lance_file/v2/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{cache::LanceCache, datatypes::Schema, utils::tempfile::TempObjFile};
10use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
11use lance_io::{
12    object_store::ObjectStore,
13    scheduler::{ScanScheduler, SchedulerConfig},
14    utils::CachedFileSize,
15    ReadBatchParams,
16};
17
18use crate::v2::reader::{FileReader, FileReaderOptions};
19
20use super::writer::{FileWriter, FileWriterOptions};
21
22pub struct FsFixture {
23    pub tmp_path: TempObjFile,
24    pub object_store: Arc<ObjectStore>,
25    pub scheduler: Arc<ScanScheduler>,
26}
27
28impl Default for FsFixture {
29    fn default() -> Self {
30        let tmp_path = TempObjFile::default();
31        let object_store = Arc::new(ObjectStore::local());
32        let scheduler =
33            ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
34        Self {
35            object_store,
36            tmp_path,
37            scheduler,
38        }
39    }
40}
41
42pub struct WrittenFile {
43    pub schema: Arc<Schema>,
44    pub data: Vec<RecordBatch>,
45    pub field_id_mapping: Vec<(u32, u32)>,
46}
47
48pub async fn write_lance_file(
49    data: impl RecordBatchReader,
50    fs: &FsFixture,
51    options: FileWriterOptions,
52) -> WrittenFile {
53    let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
54
55    let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
56
57    let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
58
59    let data = data
60        .collect::<std::result::Result<Vec<_>, ArrowError>>()
61        .unwrap();
62
63    for batch in &data {
64        file_writer.write_batch(batch).await.unwrap();
65    }
66    let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
67    file_writer.add_schema_metadata("foo", "bar");
68    file_writer.finish().await.unwrap();
69    WrittenFile {
70        schema: Arc::new(lance_schema),
71        data,
72        field_id_mapping,
73    }
74}
75
76pub fn test_cache() -> Arc<LanceCache> {
77    Arc::new(LanceCache::with_capacity(128 * 1024 * 1024))
78}
79
80pub async fn read_lance_file(
81    fs: &FsFixture,
82    decoder_middleware: Arc<DecoderPlugins>,
83    filter: FilterExpression,
84) -> Vec<RecordBatch> {
85    let file_scheduler = fs
86        .scheduler
87        .open_file(&fs.tmp_path, &CachedFileSize::unknown())
88        .await
89        .unwrap();
90    let file_reader = FileReader::try_open(
91        file_scheduler,
92        None,
93        decoder_middleware,
94        &test_cache(),
95        FileReaderOptions::default(),
96    )
97    .await
98    .unwrap();
99
100    let schema = file_reader.schema();
101    assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
102
103    let batch_stream = file_reader
104        .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
105        .unwrap();
106
107    batch_stream.try_collect().await.unwrap()
108}
109
110pub async fn count_lance_file(
111    fs: &FsFixture,
112    decoder_middleware: Arc<DecoderPlugins>,
113    filter: FilterExpression,
114) -> usize {
115    read_lance_file(fs, decoder_middleware, filter)
116        .await
117        .iter()
118        .map(|b| b.num_rows())
119        .sum()
120}