1use tracing::{debug, info, instrument, warn};
7
8use arrow::{
9 array::{ArrayRef, Float32Array, Float64Array, Int64Array, RecordBatch, RecordBatchOptions},
10 datatypes::{DataType, Schema, SchemaRef},
11};
12use datafusion::{
13 common::DataFusionError, error::Result, execution::SendableRecordBatchStream,
14 physical_plan::stream::RecordBatchStreamAdapter,
15};
16use futures::stream;
17use std::sync::Arc;
18use std::time::Instant;
19use zarrs::{array::Array, array_subset::ArraySubset, filesystem::FilesystemStore};
20
21use super::coord::{
22 calculate_coord_limits, calculate_limited_subset, create_coord_dictionary_typed, CoordValues,
23};
24use super::filter::{
25 calculate_coord_ranges, calculate_filtered_rows, coord_ranges_to_array_ranges, CoordFilters,
26 CoordValuesRef,
27};
28use super::schema_inference::discover_arrays;
29use super::stats::SharedIoStats;
30use super::tracked_store::TrackedStore;
31
32fn zarr_err(e: impl std::error::Error + Send + Sync + 'static) -> DataFusionError {
33 DataFusionError::External(Box::new(e))
34}
35
36fn dtype_to_bytes(dtype: &str) -> u64 {
38 match dtype {
39 "float32" | "int32" | "uint32" => 4,
40 "float64" | "int64" | "uint64" => 8,
41 "int16" | "uint16" => 2,
42 "int8" | "uint8" => 1,
43 _ => 8, }
45}
46
47fn arrow_dtype_to_bytes(dtype: &DataType) -> u64 {
49 match dtype {
50 DataType::Float32 | DataType::Int32 | DataType::UInt32 => 4,
51 DataType::Float64 | DataType::Int64 | DataType::UInt64 => 8,
52 DataType::Int16 | DataType::UInt16 => 2,
53 DataType::Int8 | DataType::UInt8 => 1,
54 _ => 8, }
56}
57
58macro_rules! read_coord_values {
69 (sync, $arr:expr, $subset:expr, $dtype:expr) => {
71 match $dtype {
72 "float32" => {
73 let (vals, _) = $arr
74 .retrieve_array_subset_ndarray::<f32>($subset)
75 .map_err(zarr_err)?
76 .into_raw_vec_and_offset();
77 CoordValues::Float32(vals)
78 }
79 "float64" => {
80 let (vals, _) = $arr
81 .retrieve_array_subset_ndarray::<f64>($subset)
82 .map_err(zarr_err)?
83 .into_raw_vec_and_offset();
84 CoordValues::Float64(vals)
85 }
86 _ => {
87 let (vals, _) = $arr
88 .retrieve_array_subset_ndarray::<i64>($subset)
89 .map_err(zarr_err)?
90 .into_raw_vec_and_offset();
91 CoordValues::Int64(vals)
92 }
93 }
94 };
95 (async, $arr:expr, $subset:expr, $dtype:expr) => {
97 match $dtype {
98 "float32" => {
99 let (vals, _) = $arr
100 .async_retrieve_array_subset_ndarray::<f32>($subset)
101 .await
102 .map_err(zarr_err)?
103 .into_raw_vec_and_offset();
104 CoordValues::Float32(vals)
105 }
106 "float64" => {
107 let (vals, _) = $arr
108 .async_retrieve_array_subset_ndarray::<f64>($subset)
109 .await
110 .map_err(zarr_err)?
111 .into_raw_vec_and_offset();
112 CoordValues::Float64(vals)
113 }
114 _ => {
115 let (vals, _) = $arr
116 .async_retrieve_array_subset_ndarray::<i64>($subset)
117 .await
118 .map_err(zarr_err)?
119 .into_raw_vec_and_offset();
120 CoordValues::Int64(vals)
121 }
122 }
123 };
124}
125
126macro_rules! read_data_array {
129 (sync, $arr:expr, $subset:expr, $data_type:expr) => {
131 match $data_type {
132 DataType::Float32 => {
133 let (vals, _) = $arr
134 .retrieve_array_subset_ndarray::<f32>($subset)
135 .map_err(zarr_err)?
136 .into_raw_vec_and_offset();
137 Arc::new(Float32Array::from(vals)) as ArrayRef
138 }
139 DataType::Float64 => {
140 let (vals, _) = $arr
141 .retrieve_array_subset_ndarray::<f64>($subset)
142 .map_err(zarr_err)?
143 .into_raw_vec_and_offset();
144 Arc::new(Float64Array::from(vals)) as ArrayRef
145 }
146 _ => {
147 let (vals, _) = $arr
148 .retrieve_array_subset_ndarray::<i64>($subset)
149 .map_err(zarr_err)?
150 .into_raw_vec_and_offset();
151 Arc::new(Int64Array::from(vals)) as ArrayRef
152 }
153 }
154 };
155 (async, $arr:expr, $subset:expr, $data_type:expr) => {
157 match $data_type {
158 DataType::Float32 => {
159 let (vals, _) = $arr
160 .async_retrieve_array_subset_ndarray::<f32>($subset)
161 .await
162 .map_err(zarr_err)?
163 .into_raw_vec_and_offset();
164 Arc::new(Float32Array::from(vals)) as ArrayRef
165 }
166 DataType::Float64 => {
167 let (vals, _) = $arr
168 .async_retrieve_array_subset_ndarray::<f64>($subset)
169 .await
170 .map_err(zarr_err)?
171 .into_raw_vec_and_offset();
172 Arc::new(Float64Array::from(vals)) as ArrayRef
173 }
174 _ => {
175 let (vals, _) = $arr
176 .async_retrieve_array_subset_ndarray::<i64>($subset)
177 .await
178 .map_err(zarr_err)?
179 .into_raw_vec_and_offset();
180 Arc::new(Int64Array::from(vals)) as ArrayRef
181 }
182 }
183 };
184}
185
186pub fn read_zarr(
187 store_path: &str,
188 schema: SchemaRef,
189 projection: Option<Vec<usize>>,
190 limit: Option<usize>,
191 stats: Option<SharedIoStats>,
192 coord_filters: Option<CoordFilters>,
193) -> Result<SendableRecordBatchStream> {
194 let fs_store = Arc::new(FilesystemStore::new(store_path).map_err(zarr_err)?);
195
196 let store: Arc<TrackedStore<FilesystemStore>> = Arc::new(TrackedStore::new(
198 fs_store,
199 stats.clone().unwrap_or_default(),
200 ));
201
202 let meta_start = Instant::now();
204 let store_meta = discover_arrays(store_path).map_err(DataFusionError::External)?;
205 if let Some(ref s) = stats {
206 let meta_bytes = (store_meta.coords.len() + store_meta.data_vars.len()) as u64 * 500;
208 s.record_metadata(meta_bytes, meta_start.elapsed());
209 }
210
211 let coord_names: Vec<_> = store_meta.coords.iter().map(|c| c.name.clone()).collect();
212 let coord_types: Vec<_> = store_meta
213 .coords
214 .iter()
215 .map(|c| c.data_type.clone())
216 .collect();
217
218 let mut coord_sizes: Vec<usize> = Vec::new();
220 let mut coord_values: Vec<CoordValues> = Vec::new();
221
222 for (coord, dtype) in store_meta.coords.iter().zip(coord_types.iter()) {
223 let read_start = Instant::now();
224 let arr = Array::open(store.clone(), &format!("/{}", coord.name)).map_err(zarr_err)?;
225 let size = arr.shape()[0] as usize;
226 coord_sizes.push(size);
227
228 let subset = ArraySubset::new_with_shape(arr.shape().to_vec());
229 let element_bytes = dtype_to_bytes(dtype);
230 let values = read_coord_values!(sync, arr, &subset, dtype.as_str());
231
232 if let Some(ref s) = stats {
233 let bytes = size as u64 * element_bytes;
234 s.record_coord(bytes, read_start.elapsed());
235 }
236 coord_values.push(values);
237 }
238
239 let total_rows: usize = coord_sizes.iter().product();
241
242 let coord_ranges = if let Some(ref filters) = coord_filters {
244 let coord_refs: Vec<CoordValuesRef> = coord_values
246 .iter()
247 .map(|v| match v {
248 CoordValues::Int64(vals) => CoordValuesRef::Int64(vals),
249 CoordValues::Float32(vals) => CoordValuesRef::Float32(vals),
250 CoordValues::Float64(vals) => CoordValuesRef::Float64(vals),
251 })
252 .collect();
253
254 match calculate_coord_ranges(filters, &coord_names, &coord_refs) {
255 Some(ranges) => {
256 let filtered_rows = calculate_filtered_rows(&ranges);
257 let reduction_pct = 100.0 * (1.0 - (filtered_rows as f64 / total_rows as f64));
258 info!(
259 total_rows,
260 filtered_rows,
261 reduction_pct = format!("{:.2}%", reduction_pct),
262 filters = ?filters.filters.keys().collect::<Vec<_>>(),
263 "Filter pushdown optimization"
264 );
265 Some(ranges)
266 }
267 None => {
268 warn!("Filter value not found in coordinates - returning empty result");
270 let projected_schema = Arc::new(Schema::new(
271 projection
272 .as_ref()
273 .map(|indices| {
274 indices
275 .iter()
276 .map(|&i| schema.field(i).as_ref().clone())
277 .collect::<Vec<_>>()
278 })
279 .unwrap_or_else(|| {
280 schema.fields().iter().map(|f| f.as_ref().clone()).collect()
281 }),
282 ));
283 let batch = RecordBatch::new_empty(projected_schema.clone());
284 let stream = stream::iter(vec![Ok(batch)]);
285 return Ok(Box::pin(RecordBatchStreamAdapter::new(
286 projected_schema,
287 stream,
288 )));
289 }
290 }
291 } else {
292 None
293 };
294
295 let (effective_coord_sizes, effective_rows) = if let Some(ref ranges) = coord_ranges {
297 let sizes: Vec<usize> = ranges.iter().map(|(start, end)| end - start).collect();
298 let rows = calculate_filtered_rows(ranges);
299 (sizes, rows)
300 } else {
301 (coord_sizes.clone(), total_rows)
302 };
303
304 let filtered_coord_values: Vec<CoordValues> = if let Some(ref ranges) = coord_ranges {
306 coord_values
307 .iter()
308 .zip(ranges.iter())
309 .map(|(values, (start, end))| match values {
310 CoordValues::Int64(vals) => CoordValues::Int64(vals[*start..*end].to_vec()),
311 CoordValues::Float32(vals) => CoordValues::Float32(vals[*start..*end].to_vec()),
312 CoordValues::Float64(vals) => CoordValues::Float64(vals[*start..*end].to_vec()),
313 })
314 .collect()
315 } else {
316 coord_values
317 };
318
319 let total_columns = schema.fields().len();
320 let projected_indices = projection.unwrap_or_else(|| (0..total_columns).collect());
321
322 let skipped_columns = total_columns - projected_indices.len();
324 if skipped_columns > 0 {
325 let projected_names: Vec<_> = projected_indices
326 .iter()
327 .map(|&i| schema.field(i).name().as_str())
328 .collect();
329 info!(
330 reading = projected_indices.len(),
331 skipping = skipped_columns,
332 columns = ?projected_names,
333 "Projection optimization"
334 );
335 } else {
336 info!(
337 columns = total_columns,
338 "No projection optimization (all columns)"
339 );
340 }
341
342 let final_rows = limit
344 .map(|l| l.min(effective_rows))
345 .unwrap_or(effective_rows);
346 if let Some(limit) = limit {
347 if limit < effective_rows {
348 let reduction_pct = 100.0 * (1.0 - (final_rows as f64 / effective_rows as f64));
349 info!(
350 effective_rows,
351 final_rows,
352 reduction_pct = format!("{:.2}%", reduction_pct),
353 "Limit optimization"
354 );
355 }
356 }
357
358 let mut result_arrays: Vec<ArrayRef> = Vec::new();
359
360 for idx in &projected_indices {
361 let field = schema.field(*idx);
362 let field_name = field.name();
363
364 if let Some(coord_idx) = coord_names.iter().position(|n| n == field_name) {
366 let dict_array = create_coord_dictionary_typed(
368 &filtered_coord_values[coord_idx],
369 coord_idx,
370 &effective_coord_sizes,
371 effective_rows,
372 );
373 result_arrays.push(dict_array);
374 } else {
375 let read_start = Instant::now();
377 let arr = Array::open(store.clone(), &format!("/{}", field_name)).map_err(zarr_err)?;
378
379 let subset = if let Some(ref ranges) = coord_ranges {
381 let array_ranges = coord_ranges_to_array_ranges(ranges);
382 ArraySubset::new_with_ranges(&array_ranges)
383 } else {
384 ArraySubset::new_with_shape(arr.shape().to_vec())
385 };
386 let num_elements: u64 = subset.num_elements();
387
388 let array: ArrayRef = read_data_array!(sync, arr, &subset, field.data_type());
389
390 if let Some(ref s) = stats {
391 let bytes = num_elements * arrow_dtype_to_bytes(field.data_type());
392 s.record_data(bytes, read_start.elapsed());
393 }
394 result_arrays.push(array);
395 }
396 }
397
398 let projected_schema = Arc::new(Schema::new(
399 projected_indices
400 .iter()
401 .map(|&i| schema.field(i).clone())
402 .collect::<Vec<_>>(),
403 ));
404
405 let result_arrays = if let Some(limit) = limit {
407 let limit = limit.min(effective_rows);
408 result_arrays
409 .into_iter()
410 .map(|arr| arr.slice(0, limit))
411 .collect()
412 } else {
413 result_arrays
414 };
415
416 let batch = if result_arrays.is_empty() {
418 info!(final_rows, "Empty projection - returning row count only");
419 RecordBatch::try_new_with_options(
420 projected_schema.clone(),
421 result_arrays,
422 &RecordBatchOptions::new().with_row_count(Some(final_rows)),
423 )?
424 } else {
425 RecordBatch::try_new(projected_schema.clone(), result_arrays)?
426 };
427 let stream = stream::iter(vec![Ok(batch)]);
428
429 Ok(Box::pin(RecordBatchStreamAdapter::new(
430 projected_schema,
431 stream,
432 )))
433}
434
435use super::schema_inference::{discover_arrays_async, ZarrStoreMeta};
440use zarrs::storage::AsyncReadableListableStorage;
441use zarrs_object_store::object_store::path::Path as ObjectPath;
442
443#[allow(clippy::too_many_arguments)]
445#[instrument(level = "info", skip_all)]
446pub async fn read_zarr_async(
447 store: AsyncReadableListableStorage,
448 prefix: &ObjectPath,
449 schema: SchemaRef,
450 projection: Option<Vec<usize>>,
451 limit: Option<usize>,
452 stats: Option<SharedIoStats>,
453 cached_meta: Option<ZarrStoreMeta>,
454 coord_filters: Option<CoordFilters>,
455) -> Result<SendableRecordBatchStream> {
456 info!("Starting async Zarr read");
457
458 let store_meta = if let Some(meta) = cached_meta {
460 info!("Using cached metadata");
461 meta
462 } else {
463 debug!("Discovering store metadata");
464 let meta_start = Instant::now();
465 let meta = discover_arrays_async(&store, prefix)
466 .await
467 .map_err(DataFusionError::External)?;
468 debug!(elapsed = ?meta_start.elapsed(), "Metadata discovery complete");
469
470 if let Some(ref s) = stats {
471 let meta_bytes = (meta.coords.len() + meta.data_vars.len()) as u64 * 500;
473 s.record_metadata(meta_bytes, meta_start.elapsed());
474 }
475 meta
476 };
477
478 let coord_names: Vec<_> = store_meta.coords.iter().map(|c| c.name.clone()).collect();
479 let coord_types: Vec<_> = store_meta
480 .coords
481 .iter()
482 .map(|c| c.data_type.clone())
483 .collect();
484
485 let coord_sizes: Vec<usize> = store_meta
487 .coords
488 .iter()
489 .map(|c| c.shape[0] as usize)
490 .collect();
491 debug!(?coord_names, ?coord_sizes, "Coordinate info");
492
493 let total_rows: usize = coord_sizes.iter().product();
495
496 debug!("Loading coordinate values for filter matching");
498 let mut all_coord_values: Vec<CoordValues> = Vec::new();
499
500 for (coord, dtype) in store_meta.coords.iter().zip(coord_types.iter()) {
501 let read_start = Instant::now();
502 let array_path = format!("/{}/{}", prefix, coord.name);
503
504 let arr = Array::async_open(store.clone(), &array_path)
505 .await
506 .map_err(zarr_err)?;
507
508 let subset = ArraySubset::new_with_shape(arr.shape().to_vec());
509 let element_bytes = dtype_to_bytes(dtype);
510 let values = read_coord_values!(async, arr, &subset, dtype.as_str());
511
512 if let Some(ref s) = stats {
513 let bytes = coord.shape[0] * element_bytes;
514 s.record_coord(bytes, read_start.elapsed());
515 }
516 all_coord_values.push(values);
517 }
518
519 let coord_ranges = if let Some(ref filters) = coord_filters {
521 let coord_refs: Vec<CoordValuesRef> = all_coord_values
522 .iter()
523 .map(|v| match v {
524 CoordValues::Int64(vals) => CoordValuesRef::Int64(vals),
525 CoordValues::Float32(vals) => CoordValuesRef::Float32(vals),
526 CoordValues::Float64(vals) => CoordValuesRef::Float64(vals),
527 })
528 .collect();
529
530 match calculate_coord_ranges(filters, &coord_names, &coord_refs) {
531 Some(ranges) => {
532 let filtered_rows = calculate_filtered_rows(&ranges);
533 let reduction_pct = 100.0 * (1.0 - (filtered_rows as f64 / total_rows as f64));
534 info!(
535 total_rows,
536 filtered_rows,
537 reduction_pct = format!("{:.2}%", reduction_pct),
538 filters = ?filters.filters.keys().collect::<Vec<_>>(),
539 "Filter pushdown optimization"
540 );
541 Some(ranges)
542 }
543 None => {
544 warn!("Filter value not found in coordinates - returning empty result");
546 let projected_schema = Arc::new(Schema::new(
547 projection
548 .as_ref()
549 .map(|indices| {
550 indices
551 .iter()
552 .map(|&i| schema.field(i).as_ref().clone())
553 .collect::<Vec<_>>()
554 })
555 .unwrap_or_else(|| {
556 schema.fields().iter().map(|f| f.as_ref().clone()).collect()
557 }),
558 ));
559 let batch = RecordBatch::new_empty(projected_schema.clone());
560 let stream = stream::iter(vec![Ok(batch)]);
561 return Ok(Box::pin(RecordBatchStreamAdapter::new(
562 projected_schema,
563 stream,
564 )));
565 }
566 }
567 } else {
568 None
569 };
570 debug!(?coord_ranges, "Coordinate ranges calculated");
571
572 let (effective_coord_sizes, rows_after_filter) = if let Some(ref ranges) = coord_ranges {
574 let sizes: Vec<usize> = ranges.iter().map(|(start, end)| end - start).collect();
575 let rows = calculate_filtered_rows(ranges);
576 (sizes, rows)
577 } else {
578 (coord_sizes.clone(), total_rows)
579 };
580
581 let filtered_coord_values: Vec<CoordValues> = if let Some(ref ranges) = coord_ranges {
583 all_coord_values
584 .iter()
585 .zip(ranges.iter())
586 .map(|(values, (start, end))| match values {
587 CoordValues::Int64(vals) => CoordValues::Int64(vals[*start..*end].to_vec()),
588 CoordValues::Float32(vals) => CoordValues::Float32(vals[*start..*end].to_vec()),
589 CoordValues::Float64(vals) => CoordValues::Float64(vals[*start..*end].to_vec()),
590 })
591 .collect()
592 } else {
593 all_coord_values
594 };
595
596 let effective_rows = limit
598 .map(|l| l.min(rows_after_filter))
599 .unwrap_or(rows_after_filter);
600
601 if effective_rows < rows_after_filter {
603 let reduction_pct = 100.0 * (1.0 - (effective_rows as f64 / rows_after_filter as f64));
604 info!(
605 rows_after_filter,
606 effective_rows,
607 reduction_pct = format!("{:.2}%", reduction_pct),
608 "Limit optimization applied"
609 );
610 }
611
612 let coord_value_limits = if effective_rows < rows_after_filter {
614 calculate_coord_limits(&effective_coord_sizes, effective_rows)
615 } else {
616 effective_coord_sizes.clone()
617 };
618
619 info!("Coordinates loaded and filtered");
620
621 let total_columns = schema.fields().len();
622 let projected_indices = projection.unwrap_or_else(|| (0..total_columns).collect());
623
624 let skipped_columns = total_columns - projected_indices.len();
626 if skipped_columns > 0 {
627 let projected_names: Vec<_> = projected_indices
628 .iter()
629 .map(|&i| schema.field(i).name().as_str())
630 .collect();
631 info!(
632 reading = projected_indices.len(),
633 skipping = skipped_columns,
634 columns = ?projected_names,
635 "Projection optimization"
636 );
637 } else {
638 info!(
639 columns = total_columns,
640 "No projection optimization (all columns)"
641 )
642 }
643
644 let mut result_arrays: Vec<ArrayRef> = Vec::new();
645
646 for idx in &projected_indices {
647 let field = schema.field(*idx);
648 let field_name = field.name();
649
650 if let Some(coord_idx) = coord_names.iter().position(|n| n == field_name) {
652 debug!(field = %field_name, "Building dictionary array for coordinate");
653 let dict_array = create_coord_dictionary_typed(
655 &filtered_coord_values[coord_idx],
656 coord_idx,
657 &coord_value_limits,
658 effective_rows,
659 );
660 result_arrays.push(dict_array);
661 } else {
662 debug!(field_name = %field_name, "Reading data variable");
664 let read_start = Instant::now();
665 let array_path = format!("/{}/{}", prefix, field_name);
666 debug!(path = %array_path, "Opening data variable array");
667
668 let arr = Array::async_open(store.clone(), &array_path)
669 .await
670 .map_err(zarr_err)?;
671 debug!(shape = ?arr.shape(), "Data variable shape");
672
673 let full_elements: u64 = arr.shape().iter().product();
675 let subset = if let Some(ref ranges) = coord_ranges {
676 let array_ranges = coord_ranges_to_array_ranges(ranges);
677 let filtered_subset = ArraySubset::new_with_ranges(&array_ranges);
678 let subset_elements = filtered_subset.num_elements();
679 let reduction_pct = 100.0 * (1.0 - (subset_elements as f64 / full_elements as f64));
680 info!(
681 field = %field_name,
682 subset_elements,
683 full_elements,
684 reduction_pct = format!("{:.2}%", reduction_pct),
685 "Filter-based data subset optimization"
686 );
687 filtered_subset
688 } else if effective_rows < total_rows {
689 let ranges = calculate_limited_subset(arr.shape(), effective_rows);
690 let limited_subset = ArraySubset::new_with_ranges(&ranges);
691 let subset_elements = limited_subset.num_elements();
692 let reduction_pct = 100.0 * (1.0 - (subset_elements as f64 / full_elements as f64));
693 info!(
694 field = %field_name,
695 subset_elements,
696 full_elements,
697 reduction_pct = format!("{:.2}%", reduction_pct),
698 "Limit-based data subset optimization"
699 );
700 limited_subset
701 } else {
702 debug!(field = %field_name, full_elements, "Reading full array");
703 ArraySubset::new_with_shape(arr.shape().to_vec())
704 };
705 let num_elements: u64 = subset.num_elements();
706
707 let array: ArrayRef = read_data_array!(async, arr, &subset, field.data_type());
708
709 debug!(elapsed = ?read_start.elapsed(), "Data variable read complete");
710 if let Some(ref s) = stats {
711 let bytes = num_elements * arrow_dtype_to_bytes(field.data_type());
712 s.record_data(bytes, read_start.elapsed());
713 }
714 result_arrays.push(array);
715 }
716 }
717
718 debug!("Building projected schema");
719 let projected_schema = Arc::new(Schema::new(
720 projected_indices
721 .iter()
722 .map(|&i| schema.field(i).clone())
723 .collect::<Vec<_>>(),
724 ));
725
726 let final_rows = limit
728 .map(|l| l.min(rows_after_filter))
729 .unwrap_or(rows_after_filter);
730 let result_arrays = if let Some(limit) = limit {
731 let limit = limit.min(rows_after_filter);
732 debug!(limit, "Applying final limit slice");
733 result_arrays
734 .into_iter()
735 .map(|arr| arr.slice(0, limit))
736 .collect()
737 } else {
738 result_arrays
739 };
740
741 let batch = if result_arrays.is_empty() {
743 info!(final_rows, "Empty projection - returning row count only");
744 RecordBatch::try_new_with_options(
745 projected_schema.clone(),
746 result_arrays,
747 &RecordBatchOptions::new().with_row_count(Some(final_rows)),
748 )?
749 } else {
750 RecordBatch::try_new(projected_schema.clone(), result_arrays)?
751 };
752 info!(
753 num_rows = batch.num_rows(),
754 num_columns = batch.num_columns(),
755 "RecordBatch created successfully"
756 );
757
758 let stream = stream::iter(vec![Ok(batch)]);
759
760 Ok(Box::pin(RecordBatchStreamAdapter::new(
761 projected_schema,
762 stream,
763 )))
764}