lance_file/v2/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{
10    cache::{CapacityMode, FileMetadataCache},
11    datatypes::Schema,
12};
13use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
14use lance_io::{
15    object_store::ObjectStore,
16    scheduler::{ScanScheduler, SchedulerConfig},
17    utils::CachedFileSize,
18    ReadBatchParams,
19};
20use object_store::path::Path;
21use tempfile::TempDir;
22
23use crate::v2::reader::{FileReader, FileReaderOptions};
24
25use super::writer::{FileWriter, FileWriterOptions};
26
27pub struct FsFixture {
28    _tmp_dir: TempDir,
29    pub tmp_path: Path,
30    pub object_store: Arc<ObjectStore>,
31    pub scheduler: Arc<ScanScheduler>,
32}
33
34impl Default for FsFixture {
35    fn default() -> Self {
36        let tmp_dir = tempfile::tempdir().unwrap();
37        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
38        let tmp_path = Path::parse(tmp_path).unwrap();
39        let tmp_path = tmp_path.child("some_file.lance");
40        let object_store = Arc::new(ObjectStore::local());
41        let scheduler =
42            ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
43        Self {
44            _tmp_dir: tmp_dir,
45            object_store,
46            tmp_path,
47            scheduler,
48        }
49    }
50}
51
52pub struct WrittenFile {
53    pub schema: Arc<Schema>,
54    pub data: Vec<RecordBatch>,
55    pub field_id_mapping: Vec<(u32, u32)>,
56}
57
58pub async fn write_lance_file(
59    data: impl RecordBatchReader,
60    fs: &FsFixture,
61    options: FileWriterOptions,
62) -> WrittenFile {
63    let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
64
65    let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
66
67    let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
68
69    let data = data
70        .collect::<std::result::Result<Vec<_>, ArrowError>>()
71        .unwrap();
72
73    for batch in &data {
74        file_writer.write_batch(batch).await.unwrap();
75    }
76    let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
77    file_writer.add_schema_metadata("foo", "bar");
78    file_writer.finish().await.unwrap();
79    WrittenFile {
80        schema: Arc::new(lance_schema),
81        data,
82        field_id_mapping,
83    }
84}
85
86pub fn test_cache() -> Arc<FileMetadataCache> {
87    Arc::new(FileMetadataCache::with_capacity(
88        128 * 1024 * 1024,
89        CapacityMode::Bytes,
90    ))
91}
92
93pub async fn read_lance_file(
94    fs: &FsFixture,
95    decoder_middleware: Arc<DecoderPlugins>,
96    filter: FilterExpression,
97) -> Vec<RecordBatch> {
98    let file_scheduler = fs
99        .scheduler
100        .open_file(&fs.tmp_path, &CachedFileSize::unknown())
101        .await
102        .unwrap();
103    let file_reader = FileReader::try_open(
104        file_scheduler,
105        None,
106        decoder_middleware,
107        &test_cache(),
108        FileReaderOptions::default(),
109    )
110    .await
111    .unwrap();
112
113    let schema = file_reader.schema();
114    assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
115
116    let batch_stream = file_reader
117        .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
118        .unwrap();
119
120    batch_stream.try_collect().await.unwrap()
121}
122
123pub async fn count_lance_file(
124    fs: &FsFixture,
125    decoder_middleware: Arc<DecoderPlugins>,
126    filter: FilterExpression,
127) -> usize {
128    read_lance_file(fs, decoder_middleware, filter)
129        .await
130        .iter()
131        .map(|b| b.num_rows())
132        .sum()
133}