grafeo_engine/query/executor/
stream.rs1use std::sync::atomic::{AtomicUsize, Ordering};
20use std::time::Instant;
21
22use grafeo_common::types::{LogicalType, Value};
23use grafeo_common::utils::error::{Error, QueryError, Result};
24use grafeo_core::execution::DataChunk;
25use grafeo_core::execution::operators::Operator;
26
27use crate::database::QueryResult;
28
29pub(crate) struct StreamGuard<'s> {
34 counter: &'s AtomicUsize,
35}
36
37impl<'s> StreamGuard<'s> {
38 pub(crate) fn new(counter: &'s AtomicUsize) -> Self {
39 counter.fetch_add(1, Ordering::AcqRel);
40 Self { counter }
41 }
42}
43
44impl Drop for StreamGuard<'_> {
45 fn drop(&mut self) {
46 self.counter.fetch_sub(1, Ordering::AcqRel);
47 }
48}
49
50pub struct ResultStream<'session> {
58 operator: Box<dyn Operator>,
59 columns: Vec<String>,
60 column_types: Vec<LogicalType>,
61 deadline: Option<Instant>,
62 exhausted: bool,
63 _guard: StreamGuard<'session>,
64}
65
66impl<'s> ResultStream<'s> {
67 pub(crate) fn new(
68 operator: Box<dyn Operator>,
69 columns: Vec<String>,
70 deadline: Option<Instant>,
71 guard: StreamGuard<'s>,
72 ) -> Self {
73 let len = columns.len();
74 Self {
75 operator,
76 columns,
77 column_types: vec![LogicalType::Any; len],
78 deadline,
79 exhausted: false,
80 _guard: guard,
81 }
82 }
83
84 #[must_use]
86 pub fn columns(&self) -> &[String] {
87 &self.columns
88 }
89
90 #[must_use]
92 pub fn column_types(&self) -> &[LogicalType] {
93 &self.column_types
94 }
95
96 pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
105 if self.exhausted {
106 return Ok(None);
107 }
108 check_deadline(self.deadline)?;
109 match self.operator.next() {
110 Ok(Some(chunk)) => {
111 refine_column_types(&chunk, &mut self.column_types);
112 Ok(Some(chunk))
113 }
114 Ok(None) => {
115 self.exhausted = true;
116 Ok(None)
117 }
118 Err(err) => Err(super::convert_operator_error(err)),
119 }
120 }
121
122 #[must_use]
124 pub fn into_row_iter(self) -> RowIterator<'s> {
125 RowIterator {
126 stream: self,
127 current: None,
128 cursor: 0,
129 }
130 }
131
132 pub fn collect(mut self) -> Result<QueryResult> {
141 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
142 while let Some(chunk) = self.next_chunk()? {
143 append_chunk(&chunk, &mut result);
144 }
145 result.column_types = self.column_types;
146 Ok(result)
147 }
148}
149
150pub struct RowIterator<'s> {
154 stream: ResultStream<'s>,
155 current: Option<(DataChunk, Vec<usize>)>,
158 cursor: usize,
159}
160
161impl RowIterator<'_> {
162 #[must_use]
164 pub fn columns(&self) -> &[String] {
165 self.stream.columns()
166 }
167}
168
169impl Iterator for RowIterator<'_> {
170 type Item = Result<Vec<Value>>;
171
172 fn next(&mut self) -> Option<Self::Item> {
173 loop {
174 if let Some((chunk, indices)) = &self.current {
175 if self.cursor < indices.len() {
176 let row_idx = indices[self.cursor];
177 self.cursor += 1;
178 return Some(Ok(extract_row(chunk, row_idx)));
179 }
180 self.current = None;
181 self.cursor = 0;
182 }
183 match self.stream.next_chunk() {
184 Ok(Some(chunk)) => {
185 if chunk.row_count() == 0 {
186 continue;
187 }
188 let indices: Vec<usize> = chunk.selected_indices().collect();
189 self.current = Some((chunk, indices));
190 self.cursor = 0;
191 }
192 Ok(None) => return None,
193 Err(err) => return Some(Err(err)),
194 }
195 }
196 }
197}
198
199pub struct OwnedResultStream {
212 operator: Box<dyn Operator>,
213 columns: Vec<String>,
214 column_types: Vec<LogicalType>,
215 deadline: Option<Instant>,
216 exhausted: bool,
217}
218
219impl std::fmt::Debug for OwnedResultStream {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("OwnedResultStream")
222 .field("columns", &self.columns)
223 .field("column_types", &self.column_types)
224 .field("deadline", &self.deadline)
225 .field("exhausted", &self.exhausted)
226 .finish_non_exhaustive()
227 }
228}
229
230impl OwnedResultStream {
231 pub(crate) fn new(
232 operator: Box<dyn Operator>,
233 columns: Vec<String>,
234 deadline: Option<Instant>,
235 ) -> Self {
236 let len = columns.len();
237 Self {
238 operator,
239 columns,
240 column_types: vec![LogicalType::Any; len],
241 deadline,
242 exhausted: false,
243 }
244 }
245
246 #[must_use]
248 pub fn columns(&self) -> &[String] {
249 &self.columns
250 }
251
252 #[must_use]
254 pub fn column_types(&self) -> &[LogicalType] {
255 &self.column_types
256 }
257
258 pub fn next_chunk(&mut self) -> Result<Option<DataChunk>> {
264 if self.exhausted {
265 return Ok(None);
266 }
267 check_deadline(self.deadline)?;
268 match self.operator.next() {
269 Ok(Some(chunk)) => {
270 refine_column_types(&chunk, &mut self.column_types);
271 Ok(Some(chunk))
272 }
273 Ok(None) => {
274 self.exhausted = true;
275 Ok(None)
276 }
277 Err(err) => Err(super::convert_operator_error(err)),
278 }
279 }
280
281 #[must_use]
283 pub fn into_row_iter(self) -> OwnedRowIterator {
284 OwnedRowIterator {
285 stream: self,
286 current: None,
287 cursor: 0,
288 }
289 }
290
291 pub fn collect(mut self) -> Result<QueryResult> {
297 let mut result = QueryResult::with_types(self.columns.clone(), self.column_types.clone());
298 while let Some(chunk) = self.next_chunk()? {
299 append_chunk(&chunk, &mut result);
300 }
301 result.column_types = self.column_types;
302 Ok(result)
303 }
304}
305
306pub struct OwnedRowIterator {
310 stream: OwnedResultStream,
311 current: Option<(DataChunk, Vec<usize>)>,
312 cursor: usize,
313}
314
315impl OwnedRowIterator {
316 #[must_use]
318 pub fn columns(&self) -> &[String] {
319 self.stream.columns()
320 }
321}
322
323impl Iterator for OwnedRowIterator {
324 type Item = Result<Vec<Value>>;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 loop {
328 if let Some((chunk, indices)) = &self.current {
329 if self.cursor < indices.len() {
330 let row_idx = indices[self.cursor];
331 self.cursor += 1;
332 return Some(Ok(extract_row(chunk, row_idx)));
333 }
334 self.current = None;
335 self.cursor = 0;
336 }
337 match self.stream.next_chunk() {
338 Ok(Some(chunk)) => {
339 if chunk.row_count() == 0 {
340 continue;
341 }
342 let indices: Vec<usize> = chunk.selected_indices().collect();
343 self.current = Some((chunk, indices));
344 self.cursor = 0;
345 }
346 Ok(None) => return None,
347 Err(err) => return Some(Err(err)),
348 }
349 }
350 }
351}
352
353fn check_deadline(deadline: Option<Instant>) -> Result<()> {
358 #[cfg(not(target_arch = "wasm32"))]
359 if let Some(d) = deadline
360 && Instant::now() >= d
361 {
362 return Err(Error::Query(QueryError::timeout()));
363 }
364 #[cfg(target_arch = "wasm32")]
365 let _ = deadline;
366 Ok(())
367}
368
369fn refine_column_types(chunk: &DataChunk, types: &mut Vec<LogicalType>) {
370 let col_count = chunk.column_count();
371 if col_count == 0 {
372 return;
373 }
374 if types.len() != col_count {
375 types.resize(col_count, LogicalType::Any);
376 }
377 for (col_idx, slot) in types.iter_mut().enumerate().take(col_count) {
378 if matches!(slot, LogicalType::Any)
379 && let Some(col) = chunk.column(col_idx)
380 {
381 *slot = col.data_type().clone();
382 }
383 }
384}
385
386fn extract_row(chunk: &DataChunk, row_idx: usize) -> Vec<Value> {
387 let col_count = chunk.column_count();
388 let mut row = Vec::with_capacity(col_count);
389 for col_idx in 0..col_count {
390 let value = chunk
391 .column(col_idx)
392 .and_then(|col| col.get_value(row_idx))
393 .unwrap_or(Value::Null);
394 row.push(value);
395 }
396 row
397}
398
399fn append_chunk(chunk: &DataChunk, result: &mut QueryResult) {
400 for row_idx in chunk.selected_indices() {
401 result.rows.push(extract_row(chunk, row_idx));
402 }
403}