Skip to main content

apiary_storage/
cell_reader.rs

1//! Cell reader for loading Parquet files from object storage.
2//!
3//! Supports projection pushdown (reading only requested columns) and
4//! reading cells back as Arrow RecordBatches.
5
6use std::sync::Arc;
7
8use arrow::datatypes::Schema;
9use arrow::record_batch::RecordBatch;
10use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
11
12use apiary_core::{ApiaryError, CellMetadata, Result, StorageBackend};
13
14/// Reads Parquet cells from object storage into Arrow RecordBatches.
15pub struct CellReader {
16    storage: Arc<dyn StorageBackend>,
17    frame_path: String,
18}
19
20impl CellReader {
21    /// Create a new CellReader.
22    pub fn new(storage: Arc<dyn StorageBackend>, frame_path: String) -> Self {
23        Self {
24            storage,
25            frame_path,
26        }
27    }
28
29    /// Read a single cell from storage. Returns all record batches in the cell.
30    ///
31    /// If `projection` is Some, only the specified columns are read (projection pushdown).
32    pub async fn read_cell(
33        &self,
34        cell: &CellMetadata,
35        projection: Option<&[String]>,
36    ) -> Result<Vec<RecordBatch>> {
37        let storage_key = format!("{}/{}", self.frame_path, cell.path);
38        let data = self.storage.get(&storage_key).await?;
39
40        let reader_builder = ParquetRecordBatchReaderBuilder::try_new(bytes::Bytes::from(
41            data.to_vec(),
42        ))
43        .map_err(|e| ApiaryError::Storage {
44            message: format!("Failed to open Parquet reader: {}", e),
45            source: None,
46        })?;
47
48        // Apply projection if specified
49        let reader = if let Some(cols) = projection {
50            let parquet_schema = reader_builder.schema().clone();
51            let indices: Vec<usize> = cols
52                .iter()
53                .filter_map(|col_name| parquet_schema.index_of(col_name).ok())
54                .collect();
55
56            let mask =
57                parquet::arrow::ProjectionMask::roots(reader_builder.parquet_schema(), indices);
58            reader_builder
59                .with_projection(mask)
60                .build()
61                .map_err(|e| ApiaryError::Storage {
62                    message: format!("Failed to build Parquet reader: {}", e),
63                    source: None,
64                })?
65        } else {
66            reader_builder.build().map_err(|e| ApiaryError::Storage {
67                message: format!("Failed to build Parquet reader: {}", e),
68                source: None,
69            })?
70        };
71
72        let mut batches = Vec::new();
73        for batch_result in reader {
74            let batch = batch_result.map_err(|e| ApiaryError::Storage {
75                message: format!("Failed to read Parquet batch: {}", e),
76                source: None,
77            })?;
78            batches.push(batch);
79        }
80
81        Ok(batches)
82    }
83
84    /// Read multiple cells and concatenate them into a single list of RecordBatches.
85    pub async fn read_cells(
86        &self,
87        cells: &[&CellMetadata],
88        projection: Option<&[String]>,
89    ) -> Result<Vec<RecordBatch>> {
90        let mut all_batches = Vec::new();
91
92        for cell in cells {
93            let batches = self.read_cell(cell, projection).await?;
94            all_batches.extend(batches);
95        }
96
97        Ok(all_batches)
98    }
99
100    /// Read cells and merge them into a single RecordBatch.
101    pub async fn read_cells_merged(
102        &self,
103        cells: &[&CellMetadata],
104        projection: Option<&[String]>,
105    ) -> Result<Option<RecordBatch>> {
106        let batches = self.read_cells(cells, projection).await?;
107
108        if batches.is_empty() {
109            return Ok(None);
110        }
111
112        let schema = batches[0].schema();
113        concat_batches(&schema, &batches)
114    }
115}
116
117/// Concatenate multiple RecordBatches with the same schema into one.
118fn concat_batches(schema: &Arc<Schema>, batches: &[RecordBatch]) -> Result<Option<RecordBatch>> {
119    if batches.is_empty() {
120        return Ok(None);
121    }
122
123    let merged =
124        arrow::compute::concat_batches(schema, batches).map_err(|e| ApiaryError::Internal {
125            message: format!("Failed to concatenate record batches: {}", e),
126        })?;
127
128    Ok(Some(merged))
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use crate::cell_writer::CellWriter;
135    use crate::local::LocalBackend;
136    use apiary_core::{CellSizingPolicy, FieldDef, FrameSchema};
137    use arrow::array::*;
138    use arrow::datatypes::{DataType, Field};
139
140    async fn make_storage() -> (Arc<dyn StorageBackend>, tempfile::TempDir) {
141        let dir = tempfile::tempdir().unwrap();
142        let backend = LocalBackend::new(dir.path().to_path_buf()).await.unwrap();
143        (Arc::new(backend), dir)
144    }
145
146    fn test_schema() -> FrameSchema {
147        FrameSchema {
148            fields: vec![
149                FieldDef {
150                    name: "name".into(),
151                    data_type: "string".into(),
152                    nullable: false,
153                },
154                FieldDef {
155                    name: "value".into(),
156                    data_type: "float64".into(),
157                    nullable: true,
158                },
159            ],
160        }
161    }
162
163    fn test_batch() -> RecordBatch {
164        let schema = Arc::new(Schema::new(vec![
165            Field::new("name", DataType::Utf8, false),
166            Field::new("value", DataType::Float64, true),
167        ]));
168
169        RecordBatch::try_new(
170            schema,
171            vec![
172                Arc::new(StringArray::from(vec!["alpha", "beta", "gamma"])),
173                Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0])),
174            ],
175        )
176        .unwrap()
177    }
178
179    #[tokio::test]
180    async fn test_read_cell_roundtrip() {
181        let (storage, _dir) = make_storage().await;
182        let frame_path = "test_hive/test_box/test_frame";
183        let schema = test_schema();
184        let sizing = CellSizingPolicy::new(256 * 1024 * 1024, 512 * 1024 * 1024, 16 * 1024 * 1024);
185
186        let writer = CellWriter::new(storage.clone(), frame_path.into(), schema, vec![], sizing);
187
188        let batch = test_batch();
189        let cells = writer.write(&batch).await.unwrap();
190        assert_eq!(cells.len(), 1);
191
192        let reader = CellReader::new(storage, frame_path.into());
193        let read_batches = reader.read_cell(&cells[0], None).await.unwrap();
194        assert_eq!(read_batches.len(), 1);
195        assert_eq!(read_batches[0].num_rows(), 3);
196    }
197
198    #[tokio::test]
199    async fn test_read_cell_with_projection() {
200        let (storage, _dir) = make_storage().await;
201        let frame_path = "test_hive/test_box/test_frame";
202        let schema = test_schema();
203        let sizing = CellSizingPolicy::new(256 * 1024 * 1024, 512 * 1024 * 1024, 16 * 1024 * 1024);
204
205        let writer = CellWriter::new(storage.clone(), frame_path.into(), schema, vec![], sizing);
206
207        let batch = test_batch();
208        let cells = writer.write(&batch).await.unwrap();
209
210        let reader = CellReader::new(storage, frame_path.into());
211        let projection = vec!["name".to_string()];
212        let read_batches = reader
213            .read_cell(&cells[0], Some(&projection))
214            .await
215            .unwrap();
216
217        assert_eq!(read_batches[0].num_columns(), 1);
218        assert_eq!(read_batches[0].schema().field(0).name(), "name");
219    }
220}