lance_file/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{cache::LanceCache, datatypes::Schema, utils::tempfile::TempObjFile};
10use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
11use lance_io::{
12    object_store::ObjectStore,
13    scheduler::{ScanScheduler, SchedulerConfig},
14    utils::CachedFileSize,
15    ReadBatchParams,
16};
17
18use crate::reader::{FileReader, FileReaderOptions};
19use crate::writer::{FileWriter, FileWriterOptions};
20
21pub struct FsFixture {
22    pub tmp_path: TempObjFile,
23    pub object_store: Arc<ObjectStore>,
24    pub scheduler: Arc<ScanScheduler>,
25}
26
27impl Default for FsFixture {
28    fn default() -> Self {
29        let tmp_path = TempObjFile::default();
30        let object_store = Arc::new(ObjectStore::local());
31        let scheduler =
32            ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
33        Self {
34            object_store,
35            tmp_path,
36            scheduler,
37        }
38    }
39}
40
41pub struct WrittenFile {
42    pub schema: Arc<Schema>,
43    pub data: Vec<RecordBatch>,
44    pub field_id_mapping: Vec<(u32, u32)>,
45}
46
47pub async fn write_lance_file(
48    data: impl RecordBatchReader,
49    fs: &FsFixture,
50    options: FileWriterOptions,
51) -> WrittenFile {
52    let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
53
54    let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
55
56    let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
57
58    let data = data
59        .collect::<std::result::Result<Vec<_>, ArrowError>>()
60        .unwrap();
61
62    for batch in &data {
63        file_writer.write_batch(batch).await.unwrap();
64    }
65    let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
66    file_writer.add_schema_metadata("foo", "bar");
67    file_writer.finish().await.unwrap();
68    WrittenFile {
69        schema: Arc::new(lance_schema),
70        data,
71        field_id_mapping,
72    }
73}
74
75pub fn test_cache() -> Arc<LanceCache> {
76    Arc::new(LanceCache::with_capacity(128 * 1024 * 1024))
77}
78
79pub async fn read_lance_file(
80    fs: &FsFixture,
81    decoder_middleware: Arc<DecoderPlugins>,
82    filter: FilterExpression,
83) -> Vec<RecordBatch> {
84    let file_scheduler = fs
85        .scheduler
86        .open_file(&fs.tmp_path, &CachedFileSize::unknown())
87        .await
88        .unwrap();
89    let file_reader = FileReader::try_open(
90        file_scheduler,
91        None,
92        decoder_middleware,
93        &test_cache(),
94        FileReaderOptions::default(),
95    )
96    .await
97    .unwrap();
98
99    let schema = file_reader.schema();
100    assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
101
102    let batch_stream = file_reader
103        .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
104        .unwrap();
105
106    batch_stream.try_collect().await.unwrap()
107}
108
109pub async fn count_lance_file(
110    fs: &FsFixture,
111    decoder_middleware: Arc<DecoderPlugins>,
112    filter: FilterExpression,
113) -> usize {
114    read_lance_file(fs, decoder_middleware, filter)
115        .await
116        .iter()
117        .map(|b| b.num_rows())
118        .sum()
119}