1use std::any::Any;
5use std::collections::VecDeque;
6
7use async_trait::async_trait;
8use futures::StreamExt;
9use futures::future::BoxFuture;
10use futures::stream;
11use vortex_array::ArrayRef;
12use vortex_array::stream::ArrayStreamAdapter;
13use vortex_array::stream::SendableArrayStream;
14use vortex_dtype::DType;
15use vortex_error::VortexExpect;
16use vortex_error::VortexResult;
17use vortex_error::vortex_bail;
18use vortex_layout::LayoutReaderRef;
19use vortex_session::VortexSession;
20
21use crate::ScanBuilder;
22use crate::api::DataSource;
23use crate::api::DataSourceScan;
24use crate::api::DataSourceScanRef;
25use crate::api::Estimate;
26use crate::api::ScanRequest;
27use crate::api::Split;
28use crate::api::SplitRef;
29
30pub struct LayoutReaderDataSource {
32 reader: LayoutReaderRef,
33 session: VortexSession,
34}
35
36impl LayoutReaderDataSource {
37 pub fn new(reader: LayoutReaderRef, session: VortexSession) -> Self {
39 Self { reader, session }
40 }
41}
42
43#[async_trait]
44impl DataSource for LayoutReaderDataSource {
45 fn dtype(&self) -> &DType {
46 self.reader.dtype()
47 }
48
49 fn row_count_estimate(&self) -> Estimate<u64> {
50 Estimate::Exact(self.reader.row_count())
51 }
52
53 async fn scan(&self, scan_request: ScanRequest) -> VortexResult<DataSourceScanRef> {
54 let mut builder = ScanBuilder::new(self.session.clone(), self.reader.clone());
55
56 if let Some(projection) = scan_request.projection {
57 builder = builder.with_projection(projection);
58 }
59
60 if let Some(filter) = scan_request.filter {
61 builder = builder.with_filter(filter);
62 }
63
64 if let Some(limit) = scan_request.limit {
65 builder = builder.with_limit(limit);
66 }
67
68 let scan = builder.prepare()?;
69 let dtype = scan.dtype().clone();
70 let splits = scan.execute(None)?;
71
72 Ok(Box::new(LayoutReaderScan {
73 dtype,
74 splits: VecDeque::from_iter(splits),
75 }))
76 }
77
78 fn serialize_split(&self, _split: &dyn Split) -> VortexResult<Vec<u8>> {
79 vortex_bail!("LayoutReader splits are not yet serializable");
80 }
81
82 fn deserialize_split(&self, _split: &[u8]) -> VortexResult<SplitRef> {
83 vortex_bail!("LayoutReader splits are not yet serializable");
84 }
85}
86
87struct LayoutReaderScan {
88 dtype: DType,
89 splits: VecDeque<BoxFuture<'static, VortexResult<Option<ArrayRef>>>>,
90}
91
92#[async_trait]
93impl DataSourceScan for LayoutReaderScan {
94 fn dtype(&self) -> &DType {
95 &self.dtype
96 }
97
98 fn remaining_splits_estimate(&self) -> Estimate<usize> {
99 Estimate::Exact(self.splits.len())
100 }
101
102 async fn next_splits(&mut self, max_splits: usize) -> VortexResult<Vec<SplitRef>> {
103 let n = std::cmp::min(max_splits, self.splits.len());
104
105 let mut splits = Vec::with_capacity(n);
106 for _ in 0..n {
107 let fut = self
108 .splits
109 .pop_front()
110 .vortex_expect("Checked length above ensures we have enough splits");
111 splits.push(Box::new(LayoutReaderSplit {
112 dtype: self.dtype.clone(),
113 fut,
114 }) as SplitRef);
115 }
116
117 Ok(splits)
118 }
119}
120
121struct LayoutReaderSplit {
122 dtype: DType,
123 fut: BoxFuture<'static, VortexResult<Option<ArrayRef>>>,
124}
125
126#[async_trait]
127impl Split for LayoutReaderSplit {
128 fn as_any(&self) -> &dyn Any {
129 self
130 }
131
132 fn execute(self: Box<Self>) -> VortexResult<SendableArrayStream> {
133 Ok(Box::pin(ArrayStreamAdapter::new(
134 self.dtype,
135 stream::once(self.fut).filter_map(|a| async move { a.transpose() }),
136 )))
137 }
138
139 fn row_count_estimate(&self) -> Estimate<u64> {
140 Estimate::Unknown
141 }
142
143 fn byte_size_estimate(&self) -> Estimate<u64> {
144 Estimate::Unknown
145 }
146}