1use std::time::Instant;
8
9use super::chunk::DataChunk;
10use super::operators::OperatorError;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16#[non_exhaustive]
17pub enum ChunkSizeHint {
18 Default,
20 Small,
22 Large,
24 Exact(usize),
26 AtMost(usize),
28}
29
30impl Default for ChunkSizeHint {
31 fn default() -> Self {
32 Self::Default
33 }
34}
35
36pub const DEFAULT_CHUNK_SIZE: usize = 2048;
38
39pub const SMALL_CHUNK_SIZE: usize = 512;
41
42pub const LARGE_CHUNK_SIZE: usize = 4096;
44
45pub trait Source: Send + Sync {
49 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
57
58 fn reset(&mut self);
60
61 fn name(&self) -> &'static str;
63}
64
65pub trait Sink: Send + Sync {
69 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
77
78 fn finalize(&mut self) -> Result<(), OperatorError>;
84
85 fn name(&self) -> &'static str;
87
88 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any>;
90}
91
92pub trait PushOperator: Send + Sync {
98 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
106
107 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
115
116 fn preferred_chunk_size(&self) -> ChunkSizeHint {
118 ChunkSizeHint::Default
119 }
120
121 fn name(&self) -> &'static str;
123}
124
125pub struct Pipeline {
127 source: Box<dyn Source>,
128 operators: Vec<Box<dyn PushOperator>>,
129 sink: Box<dyn Sink>,
130 deadline: Option<Instant>,
132}
133
134impl Pipeline {
135 pub fn new(
137 source: Box<dyn Source>,
138 operators: Vec<Box<dyn PushOperator>>,
139 sink: Box<dyn Sink>,
140 ) -> Self {
141 Self {
142 source,
143 operators,
144 sink,
145 deadline: None,
146 }
147 }
148
149 pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
151 Self {
152 source,
153 operators: Vec::new(),
154 sink,
155 deadline: None,
156 }
157 }
158
159 #[must_use]
161 pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
162 self.operators.push(op);
163 self
164 }
165
166 #[must_use]
171 pub fn with_deadline(mut self, deadline: Option<Instant>) -> Self {
172 self.deadline = deadline;
173 self
174 }
175
176 pub fn set_deadline(&mut self, deadline: Option<Instant>) {
178 self.deadline = deadline;
179 }
180
181 fn check_deadline(&self) -> Result<(), OperatorError> {
185 #[cfg(not(target_arch = "wasm32"))]
186 if let Some(deadline) = self.deadline
187 && Instant::now() >= deadline
188 {
189 return Err(OperatorError::Execution(
190 "Query exceeded timeout".to_string(),
191 ));
192 }
193 Ok(())
194 }
195
196 pub fn into_sink(self) -> Box<dyn Sink> {
202 self.sink
203 }
204
205 pub fn execute(&mut self) -> Result<(), OperatorError> {
220 let chunk_size = self.compute_chunk_size();
221
222 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
224 self.check_deadline()?;
225
226 if !self.push_through(chunk)? {
227 break;
229 }
230 }
231
232 self.finalize_all()
234 }
235
236 fn compute_chunk_size(&self) -> usize {
238 let mut size = DEFAULT_CHUNK_SIZE;
239
240 for op in &self.operators {
241 match op.preferred_chunk_size() {
242 ChunkSizeHint::Default => {}
243 ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
244 ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
245 ChunkSizeHint::Exact(s) => return s,
246 ChunkSizeHint::AtMost(s) => size = size.min(s),
247 }
248 }
249
250 size
251 }
252
253 fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
255 if self.operators.is_empty() {
256 return self.sink.consume(chunk);
258 }
259
260 let mut current_chunk = chunk;
262 let num_operators = self.operators.len();
263
264 for i in 0..num_operators {
265 let is_last = i == num_operators - 1;
266
267 if is_last {
268 return self.operators[i].push(current_chunk, &mut *self.sink);
270 }
271
272 let mut collector = ChunkCollector::new();
274 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
275
276 if !continue_processing || collector.is_empty() {
277 return Ok(continue_processing);
278 }
279
280 current_chunk = collector.into_single_chunk();
282 }
283
284 Ok(true)
285 }
286
287 fn finalize_all(&mut self) -> Result<(), OperatorError> {
289 if self.operators.is_empty() {
293 return self.sink.finalize();
294 }
295
296 for i in 0..self.operators.len() {
298 let is_last = i == self.operators.len() - 1;
299
300 if is_last {
301 self.operators[i].finalize(&mut *self.sink)?;
302 } else {
303 let mut collector = ChunkCollector::new();
305 self.operators[i].finalize(&mut collector)?;
306
307 for chunk in collector.into_chunks() {
308 self.push_through_from(chunk, i + 1)?;
310 }
311 }
312 }
313
314 self.sink.finalize()
315 }
316
317 fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
319 let mut current_chunk = chunk;
320
321 for i in start..self.operators.len() {
322 let is_last = i == self.operators.len() - 1;
323
324 if is_last {
325 return self.operators[i].push(current_chunk, &mut *self.sink);
326 }
327
328 let mut collector = ChunkCollector::new();
329 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
330
331 if !continue_processing || collector.is_empty() {
332 return Ok(continue_processing);
333 }
334
335 current_chunk = collector.into_single_chunk();
336 }
337
338 self.sink.consume(current_chunk)
339 }
340}
341
342pub struct ChunkCollector {
344 chunks: Vec<DataChunk>,
345}
346
347impl ChunkCollector {
348 pub fn new() -> Self {
350 Self { chunks: Vec::new() }
351 }
352
353 pub fn is_empty(&self) -> bool {
355 self.chunks.is_empty()
356 }
357
358 pub fn row_count(&self) -> usize {
360 self.chunks.iter().map(DataChunk::len).sum()
361 }
362
363 pub fn into_chunks(self) -> Vec<DataChunk> {
365 self.chunks
366 }
367
368 pub fn into_single_chunk(self) -> DataChunk {
374 if self.chunks.is_empty() {
375 return DataChunk::empty();
376 }
377 if self.chunks.len() == 1 {
378 return self
380 .chunks
381 .into_iter()
382 .next()
383 .expect("chunks has exactly one element: checked on previous line");
384 }
385
386 DataChunk::concat(&self.chunks)
388 }
389}
390
391impl Default for ChunkCollector {
392 fn default() -> Self {
393 Self::new()
394 }
395}
396
397impl Sink for ChunkCollector {
398 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
399 if !chunk.is_empty() {
400 self.chunks.push(chunk);
401 }
402 Ok(true)
403 }
404
405 fn finalize(&mut self) -> Result<(), OperatorError> {
406 Ok(())
407 }
408
409 fn name(&self) -> &'static str {
410 "ChunkCollector"
411 }
412
413 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
414 self
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use crate::execution::vector::ValueVector;
422 use grafeo_common::types::Value;
423
424 struct TestSource {
426 remaining: usize,
427 values_per_chunk: usize,
428 }
429
430 impl TestSource {
431 fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
432 Self {
433 remaining: num_chunks,
434 values_per_chunk,
435 }
436 }
437 }
438
439 impl Source for TestSource {
440 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
441 if self.remaining == 0 {
442 return Ok(None);
443 }
444 self.remaining -= 1;
445
446 #[allow(clippy::cast_possible_wrap)]
449 let values: Vec<Value> = (0..self.values_per_chunk)
450 .map(|i| Value::Int64(i as i64))
451 .collect();
452 let vector = ValueVector::from_values(&values);
453 let chunk = DataChunk::new(vec![vector]);
454 Ok(Some(chunk))
455 }
456
457 fn reset(&mut self) {}
458
459 fn name(&self) -> &'static str {
460 "TestSource"
461 }
462 }
463
464 struct TestSink {
466 chunks: Vec<DataChunk>,
467 finalized: bool,
468 }
469
470 impl TestSink {
471 fn new() -> Self {
472 Self {
473 chunks: Vec::new(),
474 finalized: false,
475 }
476 }
477 }
478
479 impl Sink for TestSink {
480 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
481 self.chunks.push(chunk);
482 Ok(true)
483 }
484
485 fn finalize(&mut self) -> Result<(), OperatorError> {
486 self.finalized = true;
487 Ok(())
488 }
489
490 fn name(&self) -> &'static str {
491 "TestSink"
492 }
493
494 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
495 self
496 }
497 }
498
499 struct PassThroughOperator;
501
502 impl PushOperator for PassThroughOperator {
503 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
504 sink.consume(chunk)
505 }
506
507 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
508 Ok(())
509 }
510
511 fn name(&self) -> &'static str {
512 "PassThrough"
513 }
514 }
515
516 #[test]
517 fn test_simple_pipeline() {
518 let source = Box::new(TestSource::new(3, 10));
519 let sink = Box::new(TestSink::new());
520
521 let mut pipeline = Pipeline::simple(source, sink);
522 pipeline.execute().unwrap();
523
524 }
527
528 #[test]
529 fn test_pipeline_with_operator() {
530 let source = Box::new(TestSource::new(2, 5));
531 let sink = Box::new(TestSink::new());
532
533 let mut pipeline =
534 Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
535
536 pipeline.execute().unwrap();
537 }
538
539 #[test]
540 fn test_chunk_collector() {
541 let mut collector = ChunkCollector::new();
542 assert!(collector.is_empty());
543
544 let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
545 let vector = ValueVector::from_values(&values);
546 let chunk = DataChunk::new(vec![vector]);
547
548 collector.consume(chunk).unwrap();
549 assert!(!collector.is_empty());
550 assert_eq!(collector.row_count(), 2);
551
552 let merged = collector.into_single_chunk();
553 assert_eq!(merged.len(), 2);
554 }
555
556 #[test]
557 fn test_pipeline_deadline_expired() {
558 use std::time::{Duration, Instant};
559
560 let source = Box::new(TestSource::new(10, 5));
561 let sink = Box::new(TestSink::new());
562
563 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
565 let mut pipeline = Pipeline::simple(source, sink).with_deadline(Some(expired));
566
567 let result = pipeline.execute();
568 assert!(result.is_err());
569 let err = result.unwrap_err();
570 assert!(
571 err.to_string().contains("Query exceeded timeout"),
572 "Expected timeout error, got: {err}"
573 );
574 }
575
576 #[test]
577 fn test_pipeline_no_deadline() {
578 let source = Box::new(TestSource::new(3, 5));
579 let sink = Box::new(TestSink::new());
580
581 let mut pipeline = Pipeline::simple(source, sink).with_deadline(None);
583 pipeline.execute().unwrap();
584 }
585
586 #[test]
587 fn test_pipeline_set_deadline() {
588 use std::time::{Duration, Instant};
589
590 let source = Box::new(TestSource::new(10, 5));
591 let sink = Box::new(TestSink::new());
592
593 let mut pipeline = Pipeline::simple(source, sink);
594
595 let expired = Instant::now().checked_sub(Duration::from_secs(1)).unwrap();
597 pipeline.set_deadline(Some(expired));
598
599 let result = pipeline.execute();
600 assert!(result.is_err());
601 let err = result.unwrap_err();
602 assert!(
603 err.to_string().contains("Query exceeded timeout"),
604 "Expected timeout error, got: {err}"
605 );
606 }
607
608 #[test]
609 fn test_chunk_size_hints() {
610 assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
611
612 let source = Box::new(TestSource::new(1, 10));
613 let sink = Box::new(TestSink::new());
614
615 struct SmallHintOp;
617 impl PushOperator for SmallHintOp {
618 fn push(
619 &mut self,
620 chunk: DataChunk,
621 sink: &mut dyn Sink,
622 ) -> Result<bool, OperatorError> {
623 sink.consume(chunk)
624 }
625 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
626 Ok(())
627 }
628 fn preferred_chunk_size(&self) -> ChunkSizeHint {
629 ChunkSizeHint::Small
630 }
631 fn name(&self) -> &'static str {
632 "SmallHint"
633 }
634 }
635
636 let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
637
638 let computed_size = pipeline.compute_chunk_size();
639 assert!(computed_size <= SMALL_CHUNK_SIZE);
640 }
641}