1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use crate::error::Result;
use arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
use futures::{Stream, StreamExt};
use pin_project_lite::pin_project;
use tokio::task::JoinHandle;
use tokio_stream::wrappers::ReceiverStream;
use super::common::AbortOnDropSingle;
use super::{RecordBatchStream, SendableRecordBatchStream};
pub struct RecordBatchReceiverStream {
schema: SchemaRef,
inner: ReceiverStream<Result<RecordBatch>>,
#[allow(dead_code)]
drop_helper: AbortOnDropSingle<()>,
}
impl RecordBatchReceiverStream {
pub fn create(
schema: &SchemaRef,
rx: tokio::sync::mpsc::Receiver<Result<RecordBatch>>,
join_handle: JoinHandle<()>,
) -> SendableRecordBatchStream {
let schema = schema.clone();
let inner = ReceiverStream::new(rx);
Box::pin(Self {
schema,
inner,
drop_helper: AbortOnDropSingle::new(join_handle),
})
}
}
impl Stream for RecordBatchReceiverStream {
type Item = Result<RecordBatch>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.inner.poll_next_unpin(cx)
}
}
impl RecordBatchStream for RecordBatchReceiverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
pin_project! {
pub struct RecordBatchStreamAdapter<S> {
schema: SchemaRef,
#[pin]
stream: S,
}
}
impl<S> RecordBatchStreamAdapter<S> {
pub fn new(schema: SchemaRef, stream: S) -> Self {
Self { schema, stream }
}
}
impl<S> std::fmt::Debug for RecordBatchStreamAdapter<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RecordBatchStreamAdapter")
.field("schema", &self.schema)
.finish()
}
}
impl<S> Stream for RecordBatchStreamAdapter<S>
where
S: Stream<Item = Result<RecordBatch>>,
{
type Item = Result<RecordBatch>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.project().stream.poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
impl<S> RecordBatchStream for RecordBatchStreamAdapter<S>
where
S: Stream<Item = Result<RecordBatch>>,
{
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}