1use std::sync::Arc;
5
6use arrow_array::{RecordBatch, RecordBatchReader};
7use arrow_schema::ArrowError;
8use futures::TryStreamExt;
9use lance_core::{
10 cache::{CapacityMode, FileMetadataCache},
11 datatypes::Schema,
12};
13use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
14use lance_io::{
15 object_store::ObjectStore,
16 scheduler::{ScanScheduler, SchedulerConfig},
17 utils::CachedFileSize,
18 ReadBatchParams,
19};
20use object_store::path::Path;
21use tempfile::TempDir;
22
23use crate::v2::reader::{FileReader, FileReaderOptions};
24
25use super::writer::{FileWriter, FileWriterOptions};
26
27pub struct FsFixture {
28 _tmp_dir: TempDir,
29 pub tmp_path: Path,
30 pub object_store: Arc<ObjectStore>,
31 pub scheduler: Arc<ScanScheduler>,
32}
33
34impl Default for FsFixture {
35 fn default() -> Self {
36 let tmp_dir = tempfile::tempdir().unwrap();
37 let tmp_path: String = tmp_dir.path().to_str().unwrap().to_owned();
38 let tmp_path = Path::parse(tmp_path).unwrap();
39 let tmp_path = tmp_path.child("some_file.lance");
40 let object_store = Arc::new(ObjectStore::local());
41 let scheduler =
42 ScanScheduler::new(object_store.clone(), SchedulerConfig::default_for_testing());
43 Self {
44 _tmp_dir: tmp_dir,
45 object_store,
46 tmp_path,
47 scheduler,
48 }
49 }
50}
51
52pub struct WrittenFile {
53 pub schema: Arc<Schema>,
54 pub data: Vec<RecordBatch>,
55 pub field_id_mapping: Vec<(u32, u32)>,
56}
57
58pub async fn write_lance_file(
59 data: impl RecordBatchReader,
60 fs: &FsFixture,
61 options: FileWriterOptions,
62) -> WrittenFile {
63 let writer = fs.object_store.create(&fs.tmp_path).await.unwrap();
64
65 let lance_schema = lance_core::datatypes::Schema::try_from(data.schema().as_ref()).unwrap();
66
67 let mut file_writer = FileWriter::try_new(writer, lance_schema.clone(), options).unwrap();
68
69 let data = data
70 .collect::<std::result::Result<Vec<_>, ArrowError>>()
71 .unwrap();
72
73 for batch in &data {
74 file_writer.write_batch(batch).await.unwrap();
75 }
76 let field_id_mapping = file_writer.field_id_to_column_indices().to_vec();
77 file_writer.add_schema_metadata("foo", "bar");
78 file_writer.finish().await.unwrap();
79 WrittenFile {
80 schema: Arc::new(lance_schema),
81 data,
82 field_id_mapping,
83 }
84}
85
86pub fn test_cache() -> Arc<FileMetadataCache> {
87 Arc::new(FileMetadataCache::with_capacity(
88 128 * 1024 * 1024,
89 CapacityMode::Bytes,
90 ))
91}
92
93pub async fn read_lance_file(
94 fs: &FsFixture,
95 decoder_middleware: Arc<DecoderPlugins>,
96 filter: FilterExpression,
97) -> Vec<RecordBatch> {
98 let file_scheduler = fs
99 .scheduler
100 .open_file(&fs.tmp_path, &CachedFileSize::unknown())
101 .await
102 .unwrap();
103 let file_reader = FileReader::try_open(
104 file_scheduler,
105 None,
106 decoder_middleware,
107 &test_cache(),
108 FileReaderOptions::default(),
109 )
110 .await
111 .unwrap();
112
113 let schema = file_reader.schema();
114 assert_eq!(schema.metadata.get("foo").unwrap(), "bar");
115
116 let batch_stream = file_reader
117 .read_stream(ReadBatchParams::RangeFull, 1024, 16, filter)
118 .unwrap();
119
120 batch_stream.try_collect().await.unwrap()
121}
122
123pub async fn count_lance_file(
124 fs: &FsFixture,
125 decoder_middleware: Arc<DecoderPlugins>,
126 filter: FilterExpression,
127) -> usize {
128 read_lance_file(fs, decoder_middleware, filter)
129 .await
130 .iter()
131 .map(|b| b.num_rows())
132 .sum()
133}