zarr_datafusion/reader/
zarr_reader.rs

1//! Zarr array reader that flattens nD data into Arrow RecordBatches
2//!
3//! See [`super::schema_inference`] for assumptions about Zarr store structure
4//! (1D coordinates, nD data variables as Cartesian product of coordinates).
5
6use tracing::{debug, info, instrument, warn};
7
8use arrow::{
9    array::{ArrayRef, Float32Array, Float64Array, Int64Array, RecordBatch, RecordBatchOptions},
10    datatypes::{DataType, Schema, SchemaRef},
11};
12use datafusion::{
13    common::DataFusionError, error::Result, execution::SendableRecordBatchStream,
14    physical_plan::stream::RecordBatchStreamAdapter,
15};
16use futures::stream;
17use std::sync::Arc;
18use std::time::Instant;
19use zarrs::{array::Array, array_subset::ArraySubset, filesystem::FilesystemStore};
20
21use super::coord::{
22    calculate_coord_limits, calculate_limited_subset, create_coord_dictionary_typed, CoordValues,
23};
24use super::filter::{
25    calculate_coord_ranges, calculate_filtered_rows, coord_ranges_to_array_ranges, CoordFilters,
26    CoordValuesRef,
27};
28use super::schema_inference::discover_arrays;
29use super::stats::SharedIoStats;
30use super::tracked_store::TrackedStore;
31
32fn zarr_err(e: impl std::error::Error + Send + Sync + 'static) -> DataFusionError {
33    DataFusionError::External(Box::new(e))
34}
35
36/// Get element size in bytes for a Zarr data type string
37fn dtype_to_bytes(dtype: &str) -> u64 {
38    match dtype {
39        "float32" | "int32" | "uint32" => 4,
40        "float64" | "int64" | "uint64" => 8,
41        "int16" | "uint16" => 2,
42        "int8" | "uint8" => 1,
43        _ => 8, // Default assumption
44    }
45}
46
47/// Get element size in bytes for an Arrow DataType
48fn arrow_dtype_to_bytes(dtype: &DataType) -> u64 {
49    match dtype {
50        DataType::Float32 | DataType::Int32 | DataType::UInt32 => 4,
51        DataType::Float64 | DataType::Int64 | DataType::UInt64 => 8,
52        DataType::Int16 | DataType::UInt16 => 2,
53        DataType::Int8 | DataType::UInt8 => 1,
54        _ => 8, // Default assumption
55    }
56}
57
58// =============================================================================
59// Macros for type-dispatched array reading (reduces ~90 lines of duplication)
60//
61// We maintain both sync and async paths because Tokio uses thread pools for
62// file I/O rather than io_uring, adding ~1-5μs overhead per operation. For
63// Zarr's many-chunk workloads, sync is faster for local files.
64// =============================================================================
65
66/// Macro to read coordinate array values with type dispatch.
67/// Handles both sync and async variants of zarrs array retrieval.
68macro_rules! read_coord_values {
69    // Sync version - uses retrieve_array_subset_ndarray
70    (sync, $arr:expr, $subset:expr, $dtype:expr) => {
71        match $dtype {
72            "float32" => {
73                let (vals, _) = $arr
74                    .retrieve_array_subset_ndarray::<f32>($subset)
75                    .map_err(zarr_err)?
76                    .into_raw_vec_and_offset();
77                CoordValues::Float32(vals)
78            }
79            "float64" => {
80                let (vals, _) = $arr
81                    .retrieve_array_subset_ndarray::<f64>($subset)
82                    .map_err(zarr_err)?
83                    .into_raw_vec_and_offset();
84                CoordValues::Float64(vals)
85            }
86            _ => {
87                let (vals, _) = $arr
88                    .retrieve_array_subset_ndarray::<i64>($subset)
89                    .map_err(zarr_err)?
90                    .into_raw_vec_and_offset();
91                CoordValues::Int64(vals)
92            }
93        }
94    };
95    // Async version - uses async_retrieve_array_subset_ndarray
96    (async, $arr:expr, $subset:expr, $dtype:expr) => {
97        match $dtype {
98            "float32" => {
99                let (vals, _) = $arr
100                    .async_retrieve_array_subset_ndarray::<f32>($subset)
101                    .await
102                    .map_err(zarr_err)?
103                    .into_raw_vec_and_offset();
104                CoordValues::Float32(vals)
105            }
106            "float64" => {
107                let (vals, _) = $arr
108                    .async_retrieve_array_subset_ndarray::<f64>($subset)
109                    .await
110                    .map_err(zarr_err)?
111                    .into_raw_vec_and_offset();
112                CoordValues::Float64(vals)
113            }
114            _ => {
115                let (vals, _) = $arr
116                    .async_retrieve_array_subset_ndarray::<i64>($subset)
117                    .await
118                    .map_err(zarr_err)?
119                    .into_raw_vec_and_offset();
120                CoordValues::Int64(vals)
121            }
122        }
123    };
124}
125
126/// Macro to read data variable array with type dispatch.
127/// Returns an ArrayRef based on the Arrow DataType.
128macro_rules! read_data_array {
129    // Sync version
130    (sync, $arr:expr, $subset:expr, $data_type:expr) => {
131        match $data_type {
132            DataType::Float32 => {
133                let (vals, _) = $arr
134                    .retrieve_array_subset_ndarray::<f32>($subset)
135                    .map_err(zarr_err)?
136                    .into_raw_vec_and_offset();
137                Arc::new(Float32Array::from(vals)) as ArrayRef
138            }
139            DataType::Float64 => {
140                let (vals, _) = $arr
141                    .retrieve_array_subset_ndarray::<f64>($subset)
142                    .map_err(zarr_err)?
143                    .into_raw_vec_and_offset();
144                Arc::new(Float64Array::from(vals)) as ArrayRef
145            }
146            _ => {
147                let (vals, _) = $arr
148                    .retrieve_array_subset_ndarray::<i64>($subset)
149                    .map_err(zarr_err)?
150                    .into_raw_vec_and_offset();
151                Arc::new(Int64Array::from(vals)) as ArrayRef
152            }
153        }
154    };
155    // Async version
156    (async, $arr:expr, $subset:expr, $data_type:expr) => {
157        match $data_type {
158            DataType::Float32 => {
159                let (vals, _) = $arr
160                    .async_retrieve_array_subset_ndarray::<f32>($subset)
161                    .await
162                    .map_err(zarr_err)?
163                    .into_raw_vec_and_offset();
164                Arc::new(Float32Array::from(vals)) as ArrayRef
165            }
166            DataType::Float64 => {
167                let (vals, _) = $arr
168                    .async_retrieve_array_subset_ndarray::<f64>($subset)
169                    .await
170                    .map_err(zarr_err)?
171                    .into_raw_vec_and_offset();
172                Arc::new(Float64Array::from(vals)) as ArrayRef
173            }
174            _ => {
175                let (vals, _) = $arr
176                    .async_retrieve_array_subset_ndarray::<i64>($subset)
177                    .await
178                    .map_err(zarr_err)?
179                    .into_raw_vec_and_offset();
180                Arc::new(Int64Array::from(vals)) as ArrayRef
181            }
182        }
183    };
184}
185
186pub fn read_zarr(
187    store_path: &str,
188    schema: SchemaRef,
189    projection: Option<Vec<usize>>,
190    limit: Option<usize>,
191    stats: Option<SharedIoStats>,
192    coord_filters: Option<CoordFilters>,
193) -> Result<SendableRecordBatchStream> {
194    let fs_store = Arc::new(FilesystemStore::new(store_path).map_err(zarr_err)?);
195
196    // Wrap with TrackedStore if stats are provided
197    let store: Arc<TrackedStore<FilesystemStore>> = Arc::new(TrackedStore::new(
198        fs_store,
199        stats.clone().unwrap_or_default(),
200    ));
201
202    // Discover store structure (with timing)
203    let meta_start = Instant::now();
204    let store_meta = discover_arrays(store_path).map_err(DataFusionError::External)?;
205    if let Some(ref s) = stats {
206        // TODO: Track actual metadata bytes read in discover_arrays() instead of estimating
207        let meta_bytes = (store_meta.coords.len() + store_meta.data_vars.len()) as u64 * 500;
208        s.record_metadata(meta_bytes, meta_start.elapsed());
209    }
210
211    let coord_names: Vec<_> = store_meta.coords.iter().map(|c| c.name.clone()).collect();
212    let coord_types: Vec<_> = store_meta
213        .coords
214        .iter()
215        .map(|c| c.data_type.clone())
216        .collect();
217
218    // Load coordinate arrays and get their sizes
219    let mut coord_sizes: Vec<usize> = Vec::new();
220    let mut coord_values: Vec<CoordValues> = Vec::new();
221
222    for (coord, dtype) in store_meta.coords.iter().zip(coord_types.iter()) {
223        let read_start = Instant::now();
224        let arr = Array::open(store.clone(), &format!("/{}", coord.name)).map_err(zarr_err)?;
225        let size = arr.shape()[0] as usize;
226        coord_sizes.push(size);
227
228        let subset = ArraySubset::new_with_shape(arr.shape().to_vec());
229        let element_bytes = dtype_to_bytes(dtype);
230        let values = read_coord_values!(sync, arr, &subset, dtype.as_str());
231
232        if let Some(ref s) = stats {
233            let bytes = size as u64 * element_bytes;
234            s.record_coord(bytes, read_start.elapsed());
235        }
236        coord_values.push(values);
237    }
238
239    // Total rows = product of all coordinate sizes (before filtering)
240    let total_rows: usize = coord_sizes.iter().product();
241
242    // Calculate coordinate ranges based on filters
243    let coord_ranges = if let Some(ref filters) = coord_filters {
244        // Convert coord_values to refs for filtering
245        let coord_refs: Vec<CoordValuesRef> = coord_values
246            .iter()
247            .map(|v| match v {
248                CoordValues::Int64(vals) => CoordValuesRef::Int64(vals),
249                CoordValues::Float32(vals) => CoordValuesRef::Float32(vals),
250                CoordValues::Float64(vals) => CoordValuesRef::Float64(vals),
251            })
252            .collect();
253
254        match calculate_coord_ranges(filters, &coord_names, &coord_refs) {
255            Some(ranges) => {
256                let filtered_rows = calculate_filtered_rows(&ranges);
257                let reduction_pct = 100.0 * (1.0 - (filtered_rows as f64 / total_rows as f64));
258                info!(
259                    total_rows,
260                    filtered_rows,
261                    reduction_pct = format!("{:.2}%", reduction_pct),
262                    filters = ?filters.filters.keys().collect::<Vec<_>>(),
263                    "Filter pushdown optimization"
264                );
265                Some(ranges)
266            }
267            None => {
268                // Filter value not found - return empty result
269                warn!("Filter value not found in coordinates - returning empty result");
270                let projected_schema = Arc::new(Schema::new(
271                    projection
272                        .as_ref()
273                        .map(|indices| {
274                            indices
275                                .iter()
276                                .map(|&i| schema.field(i).as_ref().clone())
277                                .collect::<Vec<_>>()
278                        })
279                        .unwrap_or_else(|| {
280                            schema.fields().iter().map(|f| f.as_ref().clone()).collect()
281                        }),
282                ));
283                let batch = RecordBatch::new_empty(projected_schema.clone());
284                let stream = stream::iter(vec![Ok(batch)]);
285                return Ok(Box::pin(RecordBatchStreamAdapter::new(
286                    projected_schema,
287                    stream,
288                )));
289            }
290        }
291    } else {
292        None
293    };
294
295    // Calculate effective sizes based on filters
296    let (effective_coord_sizes, effective_rows) = if let Some(ref ranges) = coord_ranges {
297        let sizes: Vec<usize> = ranges.iter().map(|(start, end)| end - start).collect();
298        let rows = calculate_filtered_rows(ranges);
299        (sizes, rows)
300    } else {
301        (coord_sizes.clone(), total_rows)
302    };
303
304    // Extract filtered coordinate values
305    let filtered_coord_values: Vec<CoordValues> = if let Some(ref ranges) = coord_ranges {
306        coord_values
307            .iter()
308            .zip(ranges.iter())
309            .map(|(values, (start, end))| match values {
310                CoordValues::Int64(vals) => CoordValues::Int64(vals[*start..*end].to_vec()),
311                CoordValues::Float32(vals) => CoordValues::Float32(vals[*start..*end].to_vec()),
312                CoordValues::Float64(vals) => CoordValues::Float64(vals[*start..*end].to_vec()),
313            })
314            .collect()
315    } else {
316        coord_values
317    };
318
319    let total_columns = schema.fields().len();
320    let projected_indices = projection.unwrap_or_else(|| (0..total_columns).collect());
321
322    // Log projection optimization effect
323    let skipped_columns = total_columns - projected_indices.len();
324    if skipped_columns > 0 {
325        let projected_names: Vec<_> = projected_indices
326            .iter()
327            .map(|&i| schema.field(i).name().as_str())
328            .collect();
329        info!(
330            reading = projected_indices.len(),
331            skipping = skipped_columns,
332            columns = ?projected_names,
333            "Projection optimization"
334        );
335    } else {
336        info!(
337            columns = total_columns,
338            "No projection optimization (all columns)"
339        );
340    }
341
342    // Apply limit (after filter reduction)
343    let final_rows = limit
344        .map(|l| l.min(effective_rows))
345        .unwrap_or(effective_rows);
346    if let Some(limit) = limit {
347        if limit < effective_rows {
348            let reduction_pct = 100.0 * (1.0 - (final_rows as f64 / effective_rows as f64));
349            info!(
350                effective_rows,
351                final_rows,
352                reduction_pct = format!("{:.2}%", reduction_pct),
353                "Limit optimization"
354            );
355        }
356    }
357
358    let mut result_arrays: Vec<ArrayRef> = Vec::new();
359
360    for idx in &projected_indices {
361        let field = schema.field(*idx);
362        let field_name = field.name();
363
364        // Check if this is a coordinate
365        if let Some(coord_idx) = coord_names.iter().position(|n| n == field_name) {
366            // Create DictionaryArray for coordinate (memory efficient)
367            let dict_array = create_coord_dictionary_typed(
368                &filtered_coord_values[coord_idx],
369                coord_idx,
370                &effective_coord_sizes,
371                effective_rows,
372            );
373            result_arrays.push(dict_array);
374        } else {
375            // Data variable - read filtered subset
376            let read_start = Instant::now();
377            let arr = Array::open(store.clone(), &format!("/{}", field_name)).map_err(zarr_err)?;
378
379            // Calculate the subset to read based on coordinate filters
380            let subset = if let Some(ref ranges) = coord_ranges {
381                let array_ranges = coord_ranges_to_array_ranges(ranges);
382                ArraySubset::new_with_ranges(&array_ranges)
383            } else {
384                ArraySubset::new_with_shape(arr.shape().to_vec())
385            };
386            let num_elements: u64 = subset.num_elements();
387
388            let array: ArrayRef = read_data_array!(sync, arr, &subset, field.data_type());
389
390            if let Some(ref s) = stats {
391                let bytes = num_elements * arrow_dtype_to_bytes(field.data_type());
392                s.record_data(bytes, read_start.elapsed());
393            }
394            result_arrays.push(array);
395        }
396    }
397
398    let projected_schema = Arc::new(Schema::new(
399        projected_indices
400            .iter()
401            .map(|&i| schema.field(i).clone())
402            .collect::<Vec<_>>(),
403    ));
404
405    // Apply limit if specified (slice the already-filtered arrays)
406    let result_arrays = if let Some(limit) = limit {
407        let limit = limit.min(effective_rows);
408        result_arrays
409            .into_iter()
410            .map(|arr| arr.slice(0, limit))
411            .collect()
412    } else {
413        result_arrays
414    };
415
416    // Handle empty projection (e.g., count(*)) - need to set row count explicitly
417    let batch = if result_arrays.is_empty() {
418        info!(final_rows, "Empty projection - returning row count only");
419        RecordBatch::try_new_with_options(
420            projected_schema.clone(),
421            result_arrays,
422            &RecordBatchOptions::new().with_row_count(Some(final_rows)),
423        )?
424    } else {
425        RecordBatch::try_new(projected_schema.clone(), result_arrays)?
426    };
427    let stream = stream::iter(vec![Ok(batch)]);
428
429    Ok(Box::pin(RecordBatchStreamAdapter::new(
430        projected_schema,
431        stream,
432    )))
433}
434
435// =============================================================================
436// Async version for remote object stores
437// =============================================================================
438
439use super::schema_inference::{discover_arrays_async, ZarrStoreMeta};
440use zarrs::storage::AsyncReadableListableStorage;
441use zarrs_object_store::object_store::path::Path as ObjectPath;
442
443/// Async version of read_zarr for remote object stores
444#[allow(clippy::too_many_arguments)]
445#[instrument(level = "info", skip_all)]
446pub async fn read_zarr_async(
447    store: AsyncReadableListableStorage,
448    prefix: &ObjectPath,
449    schema: SchemaRef,
450    projection: Option<Vec<usize>>,
451    limit: Option<usize>,
452    stats: Option<SharedIoStats>,
453    cached_meta: Option<ZarrStoreMeta>,
454    coord_filters: Option<CoordFilters>,
455) -> Result<SendableRecordBatchStream> {
456    info!("Starting async Zarr read");
457
458    // Use cached metadata if available, otherwise discover
459    let store_meta = if let Some(meta) = cached_meta {
460        info!("Using cached metadata");
461        meta
462    } else {
463        debug!("Discovering store metadata");
464        let meta_start = Instant::now();
465        let meta = discover_arrays_async(&store, prefix)
466            .await
467            .map_err(DataFusionError::External)?;
468        debug!(elapsed = ?meta_start.elapsed(), "Metadata discovery complete");
469
470        if let Some(ref s) = stats {
471            // TODO: Track actual metadata bytes read
472            let meta_bytes = (meta.coords.len() + meta.data_vars.len()) as u64 * 500;
473            s.record_metadata(meta_bytes, meta_start.elapsed());
474        }
475        meta
476    };
477
478    let coord_names: Vec<_> = store_meta.coords.iter().map(|c| c.name.clone()).collect();
479    let coord_types: Vec<_> = store_meta
480        .coords
481        .iter()
482        .map(|c| c.data_type.clone())
483        .collect();
484
485    // Get coordinate sizes from metadata (already discovered)
486    let coord_sizes: Vec<usize> = store_meta
487        .coords
488        .iter()
489        .map(|c| c.shape[0] as usize)
490        .collect();
491    debug!(?coord_names, ?coord_sizes, "Coordinate info");
492
493    // Total rows = product of all coordinate sizes (before filtering)
494    let total_rows: usize = coord_sizes.iter().product();
495
496    // First, load all coordinate values (needed for filter matching)
497    debug!("Loading coordinate values for filter matching");
498    let mut all_coord_values: Vec<CoordValues> = Vec::new();
499
500    for (coord, dtype) in store_meta.coords.iter().zip(coord_types.iter()) {
501        let read_start = Instant::now();
502        let array_path = format!("/{}/{}", prefix, coord.name);
503
504        let arr = Array::async_open(store.clone(), &array_path)
505            .await
506            .map_err(zarr_err)?;
507
508        let subset = ArraySubset::new_with_shape(arr.shape().to_vec());
509        let element_bytes = dtype_to_bytes(dtype);
510        let values = read_coord_values!(async, arr, &subset, dtype.as_str());
511
512        if let Some(ref s) = stats {
513            let bytes = coord.shape[0] * element_bytes;
514            s.record_coord(bytes, read_start.elapsed());
515        }
516        all_coord_values.push(values);
517    }
518
519    // Calculate coordinate ranges based on filters
520    let coord_ranges = if let Some(ref filters) = coord_filters {
521        let coord_refs: Vec<CoordValuesRef> = all_coord_values
522            .iter()
523            .map(|v| match v {
524                CoordValues::Int64(vals) => CoordValuesRef::Int64(vals),
525                CoordValues::Float32(vals) => CoordValuesRef::Float32(vals),
526                CoordValues::Float64(vals) => CoordValuesRef::Float64(vals),
527            })
528            .collect();
529
530        match calculate_coord_ranges(filters, &coord_names, &coord_refs) {
531            Some(ranges) => {
532                let filtered_rows = calculate_filtered_rows(&ranges);
533                let reduction_pct = 100.0 * (1.0 - (filtered_rows as f64 / total_rows as f64));
534                info!(
535                    total_rows,
536                    filtered_rows,
537                    reduction_pct = format!("{:.2}%", reduction_pct),
538                    filters = ?filters.filters.keys().collect::<Vec<_>>(),
539                    "Filter pushdown optimization"
540                );
541                Some(ranges)
542            }
543            None => {
544                // Filter value not found - return empty result
545                warn!("Filter value not found in coordinates - returning empty result");
546                let projected_schema = Arc::new(Schema::new(
547                    projection
548                        .as_ref()
549                        .map(|indices| {
550                            indices
551                                .iter()
552                                .map(|&i| schema.field(i).as_ref().clone())
553                                .collect::<Vec<_>>()
554                        })
555                        .unwrap_or_else(|| {
556                            schema.fields().iter().map(|f| f.as_ref().clone()).collect()
557                        }),
558                ));
559                let batch = RecordBatch::new_empty(projected_schema.clone());
560                let stream = stream::iter(vec![Ok(batch)]);
561                return Ok(Box::pin(RecordBatchStreamAdapter::new(
562                    projected_schema,
563                    stream,
564                )));
565            }
566        }
567    } else {
568        None
569    };
570    debug!(?coord_ranges, "Coordinate ranges calculated");
571
572    // Calculate effective sizes based on filters
573    let (effective_coord_sizes, rows_after_filter) = if let Some(ref ranges) = coord_ranges {
574        let sizes: Vec<usize> = ranges.iter().map(|(start, end)| end - start).collect();
575        let rows = calculate_filtered_rows(ranges);
576        (sizes, rows)
577    } else {
578        (coord_sizes.clone(), total_rows)
579    };
580
581    // Extract filtered coordinate values
582    let filtered_coord_values: Vec<CoordValues> = if let Some(ref ranges) = coord_ranges {
583        all_coord_values
584            .iter()
585            .zip(ranges.iter())
586            .map(|(values, (start, end))| match values {
587                CoordValues::Int64(vals) => CoordValues::Int64(vals[*start..*end].to_vec()),
588                CoordValues::Float32(vals) => CoordValues::Float32(vals[*start..*end].to_vec()),
589                CoordValues::Float64(vals) => CoordValues::Float64(vals[*start..*end].to_vec()),
590            })
591            .collect()
592    } else {
593        all_coord_values
594    };
595
596    // Apply limit (after filter reduction)
597    let effective_rows = limit
598        .map(|l| l.min(rows_after_filter))
599        .unwrap_or(rows_after_filter);
600
601    // Log limit optimization effect
602    if effective_rows < rows_after_filter {
603        let reduction_pct = 100.0 * (1.0 - (effective_rows as f64 / rows_after_filter as f64));
604        info!(
605            rows_after_filter,
606            effective_rows,
607            reduction_pct = format!("{:.2}%", reduction_pct),
608            "Limit optimization applied"
609        );
610    }
611
612    // Calculate how many values we need from each coordinate (for limit optimization on top of filter)
613    let coord_value_limits = if effective_rows < rows_after_filter {
614        calculate_coord_limits(&effective_coord_sizes, effective_rows)
615    } else {
616        effective_coord_sizes.clone()
617    };
618
619    info!("Coordinates loaded and filtered");
620
621    let total_columns = schema.fields().len();
622    let projected_indices = projection.unwrap_or_else(|| (0..total_columns).collect());
623
624    // Log projection optimization effect
625    let skipped_columns = total_columns - projected_indices.len();
626    if skipped_columns > 0 {
627        let projected_names: Vec<_> = projected_indices
628            .iter()
629            .map(|&i| schema.field(i).name().as_str())
630            .collect();
631        info!(
632            reading = projected_indices.len(),
633            skipping = skipped_columns,
634            columns = ?projected_names,
635            "Projection optimization"
636        );
637    } else {
638        info!(
639            columns = total_columns,
640            "No projection optimization (all columns)"
641        )
642    }
643
644    let mut result_arrays: Vec<ArrayRef> = Vec::new();
645
646    for idx in &projected_indices {
647        let field = schema.field(*idx);
648        let field_name = field.name();
649
650        // Check if this is a coordinate
651        if let Some(coord_idx) = coord_names.iter().position(|n| n == field_name) {
652            debug!(field = %field_name, "Building dictionary array for coordinate");
653            // Create DictionaryArray for coordinate (memory efficient)
654            let dict_array = create_coord_dictionary_typed(
655                &filtered_coord_values[coord_idx],
656                coord_idx,
657                &coord_value_limits,
658                effective_rows,
659            );
660            result_arrays.push(dict_array);
661        } else {
662            // Data variable - read filtered subset
663            debug!(field_name = %field_name, "Reading data variable");
664            let read_start = Instant::now();
665            let array_path = format!("/{}/{}", prefix, field_name);
666            debug!(path = %array_path, "Opening data variable array");
667
668            let arr = Array::async_open(store.clone(), &array_path)
669                .await
670                .map_err(zarr_err)?;
671            debug!(shape = ?arr.shape(), "Data variable shape");
672
673            // Calculate the subset to read based on coordinate filters
674            let full_elements: u64 = arr.shape().iter().product();
675            let subset = if let Some(ref ranges) = coord_ranges {
676                let array_ranges = coord_ranges_to_array_ranges(ranges);
677                let filtered_subset = ArraySubset::new_with_ranges(&array_ranges);
678                let subset_elements = filtered_subset.num_elements();
679                let reduction_pct = 100.0 * (1.0 - (subset_elements as f64 / full_elements as f64));
680                info!(
681                    field = %field_name,
682                    subset_elements,
683                    full_elements,
684                    reduction_pct = format!("{:.2}%", reduction_pct),
685                    "Filter-based data subset optimization"
686                );
687                filtered_subset
688            } else if effective_rows < total_rows {
689                let ranges = calculate_limited_subset(arr.shape(), effective_rows);
690                let limited_subset = ArraySubset::new_with_ranges(&ranges);
691                let subset_elements = limited_subset.num_elements();
692                let reduction_pct = 100.0 * (1.0 - (subset_elements as f64 / full_elements as f64));
693                info!(
694                    field = %field_name,
695                    subset_elements,
696                    full_elements,
697                    reduction_pct = format!("{:.2}%", reduction_pct),
698                    "Limit-based data subset optimization"
699                );
700                limited_subset
701            } else {
702                debug!(field = %field_name, full_elements, "Reading full array");
703                ArraySubset::new_with_shape(arr.shape().to_vec())
704            };
705            let num_elements: u64 = subset.num_elements();
706
707            let array: ArrayRef = read_data_array!(async, arr, &subset, field.data_type());
708
709            debug!(elapsed = ?read_start.elapsed(), "Data variable read complete");
710            if let Some(ref s) = stats {
711                let bytes = num_elements * arrow_dtype_to_bytes(field.data_type());
712                s.record_data(bytes, read_start.elapsed());
713            }
714            result_arrays.push(array);
715        }
716    }
717
718    debug!("Building projected schema");
719    let projected_schema = Arc::new(Schema::new(
720        projected_indices
721            .iter()
722            .map(|&i| schema.field(i).clone())
723            .collect::<Vec<_>>(),
724    ));
725
726    // Apply final limit slice if needed
727    let final_rows = limit
728        .map(|l| l.min(rows_after_filter))
729        .unwrap_or(rows_after_filter);
730    let result_arrays = if let Some(limit) = limit {
731        let limit = limit.min(rows_after_filter);
732        debug!(limit, "Applying final limit slice");
733        result_arrays
734            .into_iter()
735            .map(|arr| arr.slice(0, limit))
736            .collect()
737    } else {
738        result_arrays
739    };
740
741    // Handle empty projection (e.g., count(*)) - need to set row count explicitly
742    let batch = if result_arrays.is_empty() {
743        info!(final_rows, "Empty projection - returning row count only");
744        RecordBatch::try_new_with_options(
745            projected_schema.clone(),
746            result_arrays,
747            &RecordBatchOptions::new().with_row_count(Some(final_rows)),
748        )?
749    } else {
750        RecordBatch::try_new(projected_schema.clone(), result_arrays)?
751    };
752    info!(
753        num_rows = batch.num_rows(),
754        num_columns = batch.num_columns(),
755        "RecordBatch created successfully"
756    );
757
758    let stream = stream::iter(vec![Ok(batch)]);
759
760    Ok(Box::pin(RecordBatchStreamAdapter::new(
761        projected_schema,
762        stream,
763    )))
764}