lance_file/v2/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{cache::LanceCache, datatypes::Schema};
10use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
11use lance_io::{
12    object_store::ObjectStore,
13    scheduler::{ScanScheduler, SchedulerConfig},
14    utils::CachedFileSize,
15    ReadBatchParams,
16};
17use object_store::path::Path;
18use tempfile::TempDir;
19
20use crate::v2::reader::{FileReader, FileReaderOptions};
21
22use super::writer::{FileWriter, FileWriterOptions};
23
24pub struct FsFixture {
25    _tmp_dir: TempDir,
26    pub tmp_path: Path,
27    pub object_store: Arc<ObjectStore>,
28    pub scheduler: Arc<ScanScheduler>,
29}
30
31impl Default for FsFixture {
32    fn default() -> Self {
33        let tmp_dir = tempfile::tempdir().unwrap();
34        let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
35        let tmp_path = Path::parse(tmp_path).unwrap();
36        let tmp_path = tmp_path.child("some_file.lance");
37        let object_store = Arc::new(ObjectStore::local());
38        let scheduler =
39            ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
40        Self {
41            _tmp_dir: tmp_dir,
42            object_store,
43            tmp_path,
44            scheduler,
45        }
46    }
47}
48
49pub struct WrittenFile {
50    pub schema: Arc<Schema>,
51    pub data: Vec<RecordBatch>,
52    pub field_id_mapping: Vec<(u32, u32)>,
53}
54
55pub async fn write_lance_file(
56    data: impl RecordBatchReader,
57    fs: &FsFixture,
58    options: FileWriterOptions,
59) -> WrittenFile {
60    let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
61
62    let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
63
64    let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
65
66    let data = data
67        .collect::<std::result::Result<Vec<_>, ArrowError>>()
68        .unwrap();
69
70    for batch in &data {
71        file_writer.write_batch(batch).await.unwrap();
72    }
73    let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
74    file_writer.add_schema_metadata("foo", "bar");
75    file_writer.finish().await.unwrap();
76    WrittenFile {
77        schema: Arc::new(lance_schema),
78        data,
79        field_id_mapping,
80    }
81}
82
83pub fn test_cache() -> Arc<LanceCache> {
84    Arc::new(LanceCache::with_capacity(128 * 1024 * 1024))
85}
86
87pub async fn read_lance_file(
88    fs: &FsFixture,
89    decoder_middleware: Arc<DecoderPlugins>,
90    filter: FilterExpression,
91) -> Vec<RecordBatch> {
92    let file_scheduler = fs
93        .scheduler
94        .open_file(&fs.tmp_path, &CachedFileSize::unknown())
95        .await
96        .unwrap();
97    let file_reader = FileReader::try_open(
98        file_scheduler,
99        None,
100        decoder_middleware,
101        &test_cache(),
102        FileReaderOptions::default(),
103    )
104    .await
105    .unwrap();
106
107    let schema = file_reader.schema();
108    assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
109
110    let batch_stream = file_reader
111        .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
112        .unwrap();
113
114    batch_stream.try_collect().await.unwrap()
115}
116
117pub async fn count_lance_file(
118    fs: &FsFixture,
119    decoder_middleware: Arc<DecoderPlugins>,
120    filter: FilterExpression,
121) -> usize {
122    read_lance_file(fs, decoder_middleware, filter)
123        .await
124        .iter()
125        .map(|b| b.num_rows())
126        .sum()
127}