1use std::sync::Arc;
7
8use arrow::datatypes::Schema;
9use arrow::record_batch::RecordBatch;
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11
12use apiary_core::{ApiaryError, CellMetadata, Result, StorageBackend};
13
14pub struct CellReader {
16 storage: Arc<dyn StorageBackend>,
17 frame_path: String,
18}
19
20impl CellReader {
21 pub fn new(storage: Arc<dyn StorageBackend>, frame_path: String) -> Self {
23 Self {
24 storage,
25 frame_path,
26 }
27 }
28
29 pub async fn read_cell(
33 &self,
34 cell: &CellMetadata,
35 projection: Option<&[String]>,
36 ) -> Result<Vec<RecordBatch>> {
37 let storage_key = format!("{}/{}", self.frame_path, cell.path);
38 let data = self.storage.get(&storage_key).await?;
39
40 let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(
41 data.to_vec(),
42 ))
43 .map_err(|e| ApiaryError::Storage {
44 message: format!("Failed to open Parquet reader: {}", e),
45 source: None,
46 })?;
47
48 let reader = if let Some(cols) = projection {
50 let parquet_schema = reader_builder.schema().clone();
51 let indices: Vec<usize> = cols
52 .iter()
53 .filter_map(|col_name| parquet_schema.index_of(col_name).ok())
54 .collect();
55
56 let mask =
57 parquet::arrow::ProjectionMask::roots(reader_builder.parquet_schema(), indices);
58 reader_builder
59 .with_projection(mask)
60 .build()
61 .map_err(|e| ApiaryError::Storage {
62 message: format!("Failed to build Parquet reader: {}", e),
63 source: None,
64 })?
65 } else {
66 reader_builder.build().map_err(|e| ApiaryError::Storage {
67 message: format!("Failed to build Parquet reader: {}", e),
68 source: None,
69 })?
70 };
71
72 let mut batches = Vec::new();
73 for batch_result in reader {
74 let batch = batch_result.map_err(|e| ApiaryError::Storage {
75 message: format!("Failed to read Parquet batch: {}", e),
76 source: None,
77 })?;
78 batches.push(batch);
79 }
80
81 Ok(batches)
82 }
83
84 pub async fn read_cells(
86 &self,
87 cells: &[&CellMetadata],
88 projection: Option<&[String]>,
89 ) -> Result<Vec<RecordBatch>> {
90 let mut all_batches = Vec::new();
91
92 for cell in cells {
93 let batches = self.read_cell(cell, projection).await?;
94 all_batches.extend(batches);
95 }
96
97 Ok(all_batches)
98 }
99
100 pub async fn read_cells_merged(
102 &self,
103 cells: &[&CellMetadata],
104 projection: Option<&[String]>,
105 ) -> Result<Option<RecordBatch>> {
106 let batches = self.read_cells(cells, projection).await?;
107
108 if batches.is_empty() {
109 return Ok(None);
110 }
111
112 let schema = batches[0].schema();
113 concat_batches(&schema, &batches)
114 }
115}
116
117fn concat_batches(schema: &Arc<Schema>, batches: &[RecordBatch]) -> Result<Option<RecordBatch>> {
119 if batches.is_empty() {
120 return Ok(None);
121 }
122
123 let merged =
124 arrow::compute::concat_batches(schema, batches).map_err(|e| ApiaryError::Internal {
125 message: format!("Failed to concatenate record batches: {}", e),
126 })?;
127
128 Ok(Some(merged))
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134 use crate::cell_writer::CellWriter;
135 use crate::local::LocalBackend;
136 use apiary_core::{CellSizingPolicy, FieldDef, FrameSchema};
137 use arrow::array::*;
138 use arrow::datatypes::{DataType, Field};
139
140 async fn make_storage() -> (Arc<dyn StorageBackend>, tempfile::TempDir) {
141 let dir = tempfile::tempdir().unwrap();
142 let backend = LocalBackend::new(dir.path().to_path_buf()).await.unwrap();
143 (Arc::new(backend), dir)
144 }
145
146 fn test_schema() -> FrameSchema {
147 FrameSchema {
148 fields: vec![
149 FieldDef {
150 name: "name".into(),
151 data_type: "string".into(),
152 nullable: false,
153 },
154 FieldDef {
155 name: "value".into(),
156 data_type: "float64".into(),
157 nullable: true,
158 },
159 ],
160 }
161 }
162
163 fn test_batch() -> RecordBatch {
164 let schema = Arc::new(Schema::new(vec![
165 Field::new("name", DataType::Utf8, false),
166 Field::new("value", DataType::Float64, true),
167 ]));
168
169 RecordBatch::try_new(
170 schema,
171 vec![
172 Arc::new(StringArray::from(vec!["alpha", "beta", "gamma"])),
173 Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
174 ],
175 )
176 .unwrap()
177 }
178
179 #[tokio::test]
180 async fn test_read_cell_roundtrip() {
181 let (storage, _dir) = make_storage().await;
182 let frame_path = "test_hive/test_box/test_frame";
183 let schema = test_schema();
184 let sizing = CellSizingPolicy::new(256 * 1024 * 1024, 512 * 1024 * 1024, 16 * 1024 * 1024);
185
186 let writer = CellWriter::new(storage.clone(), frame_path.into(), schema, vec![], sizing);
187
188 let batch = test_batch();
189 let cells = writer.write(&batch).await.unwrap();
190 assert_eq!(cells.len(), 1);
191
192 let reader = CellReader::new(storage, frame_path.into());
193 let read_batches = reader.read_cell(&cells[0], None).await.unwrap();
194 assert_eq!(read_batches.len(), 1);
195 assert_eq!(read_batches[0].num_rows(), 3);
196 }
197
198 #[tokio::test]
199 async fn test_read_cell_with_projection() {
200 let (storage, _dir) = make_storage().await;
201 let frame_path = "test_hive/test_box/test_frame";
202 let schema = test_schema();
203 let sizing = CellSizingPolicy::new(256 * 1024 * 1024, 512 * 1024 * 1024, 16 * 1024 * 1024);
204
205 let writer = CellWriter::new(storage.clone(), frame_path.into(), schema, vec![], sizing);
206
207 let batch = test_batch();
208 let cells = writer.write(&batch).await.unwrap();
209
210 let reader = CellReader::new(storage, frame_path.into());
211 let projection = vec!["name".to_string()];
212 let read_batches = reader
213 .read_cell(&cells[0], Some(&projection))
214 .await
215 .unwrap();
216
217 assert_eq!(read_batches[0].num_columns(), 1);
218 assert_eq!(read_batches[0].schema().field(0).name(), "name");
219 }
220}