1use std::pin::Pin;
21use std::sync::Arc;
22use std::task::Context;
23use std::task::Poll;
24
25#[cfg(test)]
26use super::metrics::ExecutionPlanMetricsSet;
27use super::metrics::{BaselineMetrics, SplitMetrics};
28use super::{ExecutionPlan, RecordBatchStream, SendableRecordBatchStream};
29use crate::displayable;
30use crate::spill::get_record_batch_memory_size;
31
32use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
33use datafusion_common::{Result, exec_err};
34use datafusion_common_runtime::JoinSet;
35use datafusion_execution::TaskContext;
36use datafusion_execution::memory_pool::MemoryReservation;
37
38use futures::ready;
39use futures::stream::BoxStream;
40use futures::{Future, Stream, StreamExt};
41use log::debug;
42use pin_project_lite::pin_project;
43use tokio::runtime::Handle;
44use tokio::sync::mpsc::{Receiver, Sender};
45
46pub(crate) struct ReceiverStreamBuilder<O> {
58 tx: Sender<Result<O>>,
59 rx: Receiver<Result<O>>,
60 join_set: JoinSet<Result<()>>,
61}
62
63impl<O: Send + 'static> ReceiverStreamBuilder<O> {
64 pub fn new(capacity: usize) -> Self {
66 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
67
68 Self {
69 tx,
70 rx,
71 join_set: JoinSet::new(),
72 }
73 }
74
75 pub fn tx(&self) -> Sender<Result<O>> {
77 self.tx.clone()
78 }
79
80 pub fn spawn<F>(&mut self, task: F)
83 where
84 F: Future<Output = Result<()>>,
85 F: Send + 'static,
86 {
87 self.join_set.spawn(task);
88 }
89
90 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
92 where
93 F: Future<Output = Result<()>>,
94 F: Send + 'static,
95 {
96 self.join_set.spawn_on(task, handle);
97 }
98
99 pub fn spawn_blocking<F>(&mut self, f: F)
105 where
106 F: FnOnce() -> Result<()>,
107 F: Send + 'static,
108 {
109 self.join_set.spawn_blocking(f);
110 }
111
112 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
114 where
115 F: FnOnce() -> Result<()>,
116 F: Send + 'static,
117 {
118 self.join_set.spawn_blocking_on(f, handle);
119 }
120
121 pub fn build(self) -> BoxStream<'static, Result<O>> {
123 let Self {
124 tx,
125 rx,
126 mut join_set,
127 } = self;
128
129 drop(tx);
131
132 let check = async move {
134 while let Some(result) = join_set.join_next().await {
135 match result {
136 Ok(task_result) => {
137 match task_result {
138 Ok(_) => continue,
140 Err(error) => return Some(Err(error)),
142 }
143 }
144 Err(e) => {
146 if e.is_panic() {
147 std::panic::resume_unwind(e.into_panic());
149 } else {
150 return Some(exec_err!("Non Panic Task error: {e}"));
156 }
157 }
158 }
159 }
160 None
161 };
162
163 let check_stream = futures::stream::once(check)
164 .filter_map(|item| async move { item });
166
167 let rx_stream = futures::stream::unfold(rx, |mut rx| async move {
169 let next_item = rx.recv().await;
170 next_item.map(|next_item| (next_item, rx))
171 });
172
173 futures::stream::select(rx_stream, check_stream).boxed()
176 }
177}
178
179pub struct RecordBatchReceiverStreamBuilder {
240 schema: SchemaRef,
241 inner: ReceiverStreamBuilder<RecordBatch>,
242}
243
244impl RecordBatchReceiverStreamBuilder {
245 pub fn new(schema: SchemaRef, capacity: usize) -> Self {
247 Self {
248 schema,
249 inner: ReceiverStreamBuilder::new(capacity),
250 }
251 }
252
253 pub fn tx(&self) -> Sender<Result<RecordBatch>> {
259 self.inner.tx()
260 }
261
262 pub fn spawn<F>(&mut self, task: F)
269 where
270 F: Future<Output = Result<()>>,
271 F: Send + 'static,
272 {
273 self.inner.spawn(task)
274 }
275
276 pub fn spawn_on<F>(&mut self, task: F, handle: &Handle)
278 where
279 F: Future<Output = Result<()>>,
280 F: Send + 'static,
281 {
282 self.inner.spawn_on(task, handle)
283 }
284
285 pub fn spawn_blocking<F>(&mut self, f: F)
305 where
306 F: FnOnce() -> Result<()>,
307 F: Send + 'static,
308 {
309 self.inner.spawn_blocking(f)
310 }
311
312 pub fn spawn_blocking_on<F>(&mut self, f: F, handle: &Handle)
314 where
315 F: FnOnce() -> Result<()>,
316 F: Send + 'static,
317 {
318 self.inner.spawn_blocking_on(f, handle)
319 }
320
321 pub(crate) fn run_input(
327 &mut self,
328 input: Arc<dyn ExecutionPlan>,
329 partition: usize,
330 context: Arc<TaskContext>,
331 ) {
332 let output = self.tx();
333
334 self.inner.spawn(async move {
335 let mut stream = match input.execute(partition, context) {
336 Err(e) => {
337 output.send(Err(e)).await.ok();
340 debug!(
341 "Stopping execution: error executing input: {}",
342 displayable(input.as_ref()).one_line()
343 );
344 return Ok(());
345 }
346 Ok(stream) => stream,
347 };
348
349 while let Some(item) = stream.next().await {
352 let is_err = item.is_err();
353
354 if output.send(item).await.is_err() {
357 debug!(
358 "Stopping execution: output is gone, plan cancelling: {}",
359 displayable(input.as_ref()).one_line()
360 );
361 return Ok(());
362 }
363
364 if is_err {
367 debug!(
368 "Stopping execution: plan returned error: {}",
369 displayable(input.as_ref()).one_line()
370 );
371 return Ok(());
372 }
373 }
374
375 Ok(())
376 });
377 }
378
379 pub fn build(self) -> SendableRecordBatchStream {
381 Box::pin(RecordBatchStreamAdapter::new(
382 self.schema,
383 self.inner.build(),
384 ))
385 }
386}
387
388#[doc(hidden)]
389pub struct RecordBatchReceiverStream {}
390
391impl RecordBatchReceiverStream {
392 pub fn builder(
394 schema: SchemaRef,
395 capacity: usize,
396 ) -> RecordBatchReceiverStreamBuilder {
397 RecordBatchReceiverStreamBuilder::new(schema, capacity)
398 }
399}
400
401pin_project! {
402 pub struct RecordBatchStreamAdapter<S> {
407 schema: SchemaRef,
408
409 #[pin]
410 stream: S,
411 }
412}
413
414impl<S> RecordBatchStreamAdapter<S> {
415 pub fn new(schema: SchemaRef, stream: S) -> Self {
439 Self { schema, stream }
440 }
441}
442
443impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
444 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
445 f.debug_struct("RecordBatchStreamAdapter")
446 .field("schema", &self.schema)
447 .finish()
448 }
449}
450
451impl<S> Stream for RecordBatchStreamAdapter<S>
452where
453 S: Stream<Item = Result<RecordBatch>>,
454{
455 type Item = Result<RecordBatch>;
456
457 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
458 self.project().stream.poll_next(cx)
459 }
460
461 fn size_hint(&self) -> (usize, Option<usize>) {
462 self.stream.size_hint()
463 }
464}
465
466impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
467where
468 S: Stream<Item = Result<RecordBatch>>,
469{
470 fn schema(&self) -> SchemaRef {
471 Arc::clone(&self.schema)
472 }
473}
474
475pub struct EmptyRecordBatchStream {
478 schema: SchemaRef,
480}
481
482impl EmptyRecordBatchStream {
483 pub fn new(schema: SchemaRef) -> Self {
485 Self { schema }
486 }
487}
488
489impl RecordBatchStream for EmptyRecordBatchStream {
490 fn schema(&self) -> SchemaRef {
491 Arc::clone(&self.schema)
492 }
493}
494
495impl Stream for EmptyRecordBatchStream {
496 type Item = Result<RecordBatch>;
497
498 fn poll_next(
499 self: Pin<&mut Self>,
500 _cx: &mut Context<'_>,
501 ) -> Poll<Option<Self::Item>> {
502 Poll::Ready(None)
503 }
504}
505
506pub(crate) struct ObservedStream {
509 inner: SendableRecordBatchStream,
510 baseline_metrics: BaselineMetrics,
511 fetch: Option<usize>,
512 produced: usize,
513}
514
515impl ObservedStream {
516 pub fn new(
517 inner: SendableRecordBatchStream,
518 baseline_metrics: BaselineMetrics,
519 fetch: Option<usize>,
520 ) -> Self {
521 Self {
522 inner,
523 baseline_metrics,
524 fetch,
525 produced: 0,
526 }
527 }
528
529 fn limit_reached(
530 &mut self,
531 poll: Poll<Option<Result<RecordBatch>>>,
532 ) -> Poll<Option<Result<RecordBatch>>> {
533 let Some(fetch) = self.fetch else { return poll };
534
535 if self.produced >= fetch {
536 return Poll::Ready(None);
537 }
538
539 if let Poll::Ready(Some(Ok(batch))) = &poll {
540 if self.produced + batch.num_rows() > fetch {
541 let batch = batch.slice(0, fetch.saturating_sub(self.produced));
542 self.produced += batch.num_rows();
543 return Poll::Ready(Some(Ok(batch)));
544 };
545 self.produced += batch.num_rows()
546 }
547 poll
548 }
549}
550
551impl RecordBatchStream for ObservedStream {
552 fn schema(&self) -> SchemaRef {
553 self.inner.schema()
554 }
555}
556
557impl Stream for ObservedStream {
558 type Item = Result<RecordBatch>;
559
560 fn poll_next(
561 mut self: Pin<&mut Self>,
562 cx: &mut Context<'_>,
563 ) -> Poll<Option<Self::Item>> {
564 let mut poll = self.inner.poll_next_unpin(cx);
565 if self.fetch.is_some() {
566 poll = self.limit_reached(poll);
567 }
568 self.baseline_metrics.record_poll(poll)
569 }
570}
571
572pin_project! {
573 pub struct BatchSplitStream {
591 #[pin]
592 input: SendableRecordBatchStream,
593 schema: SchemaRef,
594 batch_size: usize,
595 metrics: SplitMetrics,
596 current_batch: Option<RecordBatch>,
597 offset: usize,
598 }
599}
600
601impl BatchSplitStream {
602 pub fn new(
604 input: SendableRecordBatchStream,
605 batch_size: usize,
606 metrics: SplitMetrics,
607 ) -> Self {
608 let schema = input.schema();
609 Self {
610 input,
611 schema,
612 batch_size,
613 metrics,
614 current_batch: None,
615 offset: 0,
616 }
617 }
618
619 fn next_sliced_batch(&mut self) -> Option<Result<RecordBatch>> {
624 let batch = self.current_batch.take()?;
625
626 debug_assert!(
628 self.offset <= batch.num_rows(),
629 "Offset {} exceeds batch size {}",
630 self.offset,
631 batch.num_rows()
632 );
633
634 let remaining = batch.num_rows() - self.offset;
635 let to_take = remaining.min(self.batch_size);
636 let out = batch.slice(self.offset, to_take);
637
638 self.metrics.batches_split.add(1);
639 self.offset += to_take;
640 if self.offset < batch.num_rows() {
641 self.current_batch = Some(batch);
643 } else {
644 self.offset = 0;
647 }
648 Some(Ok(out))
649 }
650
651 fn poll_upstream(
657 &mut self,
658 cx: &mut Context<'_>,
659 ) -> Poll<Option<Result<RecordBatch>>> {
660 match ready!(self.input.as_mut().poll_next(cx)) {
661 Some(Ok(batch)) => {
662 if batch.num_rows() <= self.batch_size {
663 Poll::Ready(Some(Ok(batch)))
665 } else {
666 self.current_batch = Some(batch);
668 match self.next_sliced_batch() {
670 Some(result) => Poll::Ready(Some(result)),
671 None => Poll::Ready(None), }
673 }
674 }
675 Some(Err(e)) => Poll::Ready(Some(Err(e))),
676 None => Poll::Ready(None),
677 }
678 }
679}
680
681impl Stream for BatchSplitStream {
682 type Item = Result<RecordBatch>;
683
684 fn poll_next(
685 mut self: Pin<&mut Self>,
686 cx: &mut Context<'_>,
687 ) -> Poll<Option<Self::Item>> {
688 if let Some(result) = self.next_sliced_batch() {
690 return Poll::Ready(Some(result));
691 }
692
693 self.poll_upstream(cx)
695 }
696}
697
698impl RecordBatchStream for BatchSplitStream {
699 fn schema(&self) -> SchemaRef {
700 Arc::clone(&self.schema)
701 }
702}
703
704pub(crate) struct ReservationStream {
709 schema: SchemaRef,
710 inner: SendableRecordBatchStream,
711 reservation: MemoryReservation,
712}
713
714impl ReservationStream {
715 pub(crate) fn new(
716 schema: SchemaRef,
717 inner: SendableRecordBatchStream,
718 reservation: MemoryReservation,
719 ) -> Self {
720 Self {
721 schema,
722 inner,
723 reservation,
724 }
725 }
726}
727
728impl Stream for ReservationStream {
729 type Item = Result<RecordBatch>;
730
731 fn poll_next(
732 mut self: Pin<&mut Self>,
733 cx: &mut Context<'_>,
734 ) -> Poll<Option<Self::Item>> {
735 let res = self.inner.poll_next_unpin(cx);
736
737 match res {
738 Poll::Ready(res) => {
739 match res {
740 Some(Ok(batch)) => {
741 self.reservation
742 .shrink(get_record_batch_memory_size(&batch));
743 Poll::Ready(Some(Ok(batch)))
744 }
745 Some(Err(err)) => Poll::Ready(Some(Err(err))),
746 None => {
747 self.reservation.free();
749 Poll::Ready(None)
750 }
751 }
752 }
753 Poll::Pending => Poll::Pending,
754 }
755 }
756
757 fn size_hint(&self) -> (usize, Option<usize>) {
758 self.inner.size_hint()
759 }
760}
761
762impl RecordBatchStream for ReservationStream {
763 fn schema(&self) -> SchemaRef {
764 Arc::clone(&self.schema)
765 }
766}
767
768#[cfg(test)]
769mod test {
770 use super::*;
771 use crate::test::exec::{
772 BlockingExec, MockExec, PanicExec, assert_strong_count_converges_to_zero,
773 };
774
775 use arrow::datatypes::{DataType, Field, Schema};
776 use datafusion_common::exec_err;
777
778 fn schema() -> SchemaRef {
779 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]))
780 }
781
782 #[tokio::test]
783 #[should_panic(expected = "PanickingStream did panic")]
784 async fn record_batch_receiver_stream_propagates_panics() {
785 let schema = schema();
786
787 let num_partitions = 10;
788 let input = PanicExec::new(Arc::clone(&schema), num_partitions);
789 consume(input, 10).await
790 }
791
792 #[tokio::test]
793 #[should_panic(expected = "PanickingStream did panic: 1")]
794 async fn record_batch_receiver_stream_propagates_panics_early_shutdown() {
795 let schema = schema();
796
797 let num_partitions = 2;
799 let input = PanicExec::new(Arc::clone(&schema), num_partitions)
800 .with_partition_panic(0, 10)
801 .with_partition_panic(1, 3); let max_batches = 5;
809 consume(input, max_batches).await
810 }
811
812 #[tokio::test]
813 async fn record_batch_receiver_stream_drop_cancel() {
814 let task_ctx = Arc::new(TaskContext::default());
815 let schema = schema();
816
817 let input = BlockingExec::new(Arc::clone(&schema), 1);
819 let refs = input.refs();
820
821 let mut builder = RecordBatchReceiverStream::builder(schema, 2);
823 builder.run_input(Arc::new(input), 0, Arc::clone(&task_ctx));
824 let stream = builder.build();
825
826 assert!(std::sync::Weak::strong_count(&refs) > 0);
828
829 drop(stream);
831 assert_strong_count_converges_to_zero(refs).await;
832 }
833
834 #[tokio::test]
835 async fn record_batch_receiver_stream_error_does_not_drive_completion() {
839 let task_ctx = Arc::new(TaskContext::default());
840 let schema = schema();
841
842 let error_stream = MockExec::new(
844 vec![exec_err!("Test1"), exec_err!("Test2")],
845 Arc::clone(&schema),
846 )
847 .with_use_task(false);
848
849 let mut builder = RecordBatchReceiverStream::builder(schema, 2);
850 builder.run_input(Arc::new(error_stream), 0, Arc::clone(&task_ctx));
851 let mut stream = builder.build();
852
853 let first_batch = stream.next().await.unwrap();
855 let first_err = first_batch.unwrap_err();
856 assert_eq!(first_err.strip_backtrace(), "Execution error: Test1");
857
858 assert!(stream.next().await.is_none());
860 }
861
862 #[tokio::test]
863 async fn batch_split_stream_basic_functionality() {
864 use arrow::array::{Int32Array, RecordBatch};
865 use futures::stream::{self, StreamExt};
866
867 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
868
869 let large_batch = RecordBatch::try_new(
871 Arc::clone(&schema),
872 vec![Arc::new(Int32Array::from((0..2000).collect::<Vec<_>>()))],
873 )
874 .unwrap();
875
876 let input_stream = stream::iter(vec![Ok(large_batch)]);
878 let adapter = RecordBatchStreamAdapter::new(Arc::clone(&schema), input_stream);
879 let batch_stream = Box::pin(adapter) as SendableRecordBatchStream;
880
881 let metrics = ExecutionPlanMetricsSet::new();
883 let split_metrics = SplitMetrics::new(&metrics, 0);
884 let mut split_stream = BatchSplitStream::new(batch_stream, 500, split_metrics);
885
886 let mut total_rows = 0;
887 let mut batch_count = 0;
888
889 while let Some(result) = split_stream.next().await {
890 let batch = result.unwrap();
891 assert!(batch.num_rows() <= 500, "Batch size should not exceed 500");
892 total_rows += batch.num_rows();
893 batch_count += 1;
894 }
895
896 assert_eq!(total_rows, 2000, "All rows should be preserved");
897 assert_eq!(batch_count, 4, "Should have 4 batches of 500 rows each");
898 }
899
900 async fn consume(input: PanicExec, max_batches: usize) {
905 let task_ctx = Arc::new(TaskContext::default());
906
907 let input = Arc::new(input);
908 let num_partitions = input.properties().output_partitioning().partition_count();
909
910 let mut builder =
912 RecordBatchReceiverStream::builder(input.schema(), num_partitions);
913 for partition in 0..num_partitions {
914 builder.run_input(
915 Arc::clone(&input) as Arc<dyn ExecutionPlan>,
916 partition,
917 Arc::clone(&task_ctx),
918 );
919 }
920 let mut stream = builder.build();
921
922 let mut num_batches = 0;
924 while let Some(next) = stream.next().await {
925 next.unwrap();
926 num_batches += 1;
927 assert!(
928 num_batches < max_batches,
929 "Got the limit of {num_batches} batches before seeing panic"
930 );
931 }
932 }
933
934 #[test]
935 fn record_batch_receiver_stream_builder_spawn_on_runtime() {
936 let tokio_runtime = tokio::runtime::Builder::new_multi_thread()
937 .enable_all()
938 .build()
939 .unwrap();
940
941 let mut builder =
942 RecordBatchReceiverStreamBuilder::new(Arc::new(Schema::empty()), 10);
943
944 let tx1 = builder.tx();
945 builder.spawn_on(
946 async move {
947 tx1.send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
948 .await
949 .unwrap();
950
951 Ok(())
952 },
953 tokio_runtime.handle(),
954 );
955
956 let tx2 = builder.tx();
957 builder.spawn_blocking_on(
958 move || {
959 tx2.blocking_send(Ok(RecordBatch::new_empty(Arc::new(Schema::empty()))))
960 .unwrap();
961
962 Ok(())
963 },
964 tokio_runtime.handle(),
965 );
966
967 let mut stream = builder.build();
968
969 let mut number_of_batches = 0;
970
971 loop {
972 let poll = stream.poll_next_unpin(&mut Context::from_waker(
973 futures::task::noop_waker_ref(),
974 ));
975
976 match poll {
977 Poll::Ready(None) => {
978 break;
979 }
980 Poll::Ready(Some(Ok(batch))) => {
981 number_of_batches += 1;
982 assert_eq!(batch.num_rows(), 0);
983 }
984 Poll::Ready(Some(Err(e))) => panic!("Unexpected error: {e}"),
985 Poll::Pending => {
986 continue;
987 }
988 }
989 }
990
991 assert_eq!(
992 number_of_batches, 2,
993 "Should have received exactly two empty batches"
994 );
995 }
996
997 #[tokio::test]
998 async fn test_reservation_stream_shrinks_on_poll() {
999 use arrow::array::Int32Array;
1000 use datafusion_execution::memory_pool::MemoryConsumer;
1001 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1002
1003 let runtime = RuntimeEnvBuilder::new()
1004 .with_memory_limit(10 * 1024 * 1024, 1.0)
1005 .build_arc()
1006 .unwrap();
1007
1008 let mut reservation = MemoryConsumer::new("test").register(&runtime.memory_pool);
1009
1010 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1011
1012 let batch1 = RecordBatch::try_new(
1014 Arc::clone(&schema),
1015 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
1016 )
1017 .unwrap();
1018 let batch2 = RecordBatch::try_new(
1019 Arc::clone(&schema),
1020 vec![Arc::new(Int32Array::from(vec![6, 7, 8, 9, 10]))],
1021 )
1022 .unwrap();
1023
1024 let batch1_size = get_record_batch_memory_size(&batch1);
1025 let batch2_size = get_record_batch_memory_size(&batch2);
1026
1027 reservation.try_grow(batch1_size + batch2_size).unwrap();
1029 let initial_reserved = runtime.memory_pool.reserved();
1030 assert_eq!(initial_reserved, batch1_size + batch2_size);
1031
1032 let stream = futures::stream::iter(vec![Ok(batch1), Ok(batch2)]);
1034 let inner = Box::pin(RecordBatchStreamAdapter::new(Arc::clone(&schema), stream))
1035 as SendableRecordBatchStream;
1036
1037 let mut res_stream =
1038 ReservationStream::new(Arc::clone(&schema), inner, reservation);
1039
1040 let result1 = res_stream.next().await;
1042 assert!(result1.is_some());
1043
1044 let after_first = runtime.memory_pool.reserved();
1046 assert_eq!(after_first, batch2_size);
1047
1048 let result2 = res_stream.next().await;
1050 assert!(result2.is_some());
1051
1052 let after_second = runtime.memory_pool.reserved();
1054 assert_eq!(after_second, 0);
1055
1056 let result3 = res_stream.next().await;
1058 assert!(result3.is_none());
1059
1060 assert_eq!(runtime.memory_pool.reserved(), 0);
1062 }
1063
1064 #[tokio::test]
1065 async fn test_reservation_stream_error_handling() {
1066 use datafusion_execution::memory_pool::MemoryConsumer;
1067 use datafusion_execution::runtime_env::RuntimeEnvBuilder;
1068
1069 let runtime = RuntimeEnvBuilder::new()
1070 .with_memory_limit(10 * 1024 * 1024, 1.0)
1071 .build_arc()
1072 .unwrap();
1073
1074 let mut reservation = MemoryConsumer::new("test").register(&runtime.memory_pool);
1075
1076 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
1077
1078 reservation.try_grow(1000).unwrap();
1079 let initial = runtime.memory_pool.reserved();
1080 assert_eq!(initial, 1000);
1081
1082 let stream = futures::stream::iter(vec![exec_err!("Test error")]);
1084 let inner = Box::pin(RecordBatchStreamAdapter::new(Arc::clone(&schema), stream))
1085 as SendableRecordBatchStream;
1086
1087 let mut res_stream =
1088 ReservationStream::new(Arc::clone(&schema), inner, reservation);
1089
1090 let result = res_stream.next().await;
1092 assert!(result.is_some());
1093 assert!(result.unwrap().is_err());
1094
1095 let after_error = runtime.memory_pool.reserved();
1100 assert_eq!(
1101 after_error, 1000,
1102 "Reservation should still be held after error"
1103 );
1104
1105 drop(res_stream);
1107
1108 assert_eq!(
1110 runtime.memory_pool.reserved(),
1111 0,
1112 "Memory should be freed when stream is dropped"
1113 );
1114 }
1115}