1use super::chunk::DataChunk;
8use super::operators::OperatorError;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ChunkSizeHint {
15 Default,
17 Small,
19 Large,
21 Exact(usize),
23 AtMost(usize),
25}
26
27impl Default for ChunkSizeHint {
28 fn default() -> Self {
29 Self::Default
30 }
31}
32
33pub const DEFAULT_CHUNK_SIZE: usize = 2048;
35
36pub const SMALL_CHUNK_SIZE: usize = 512;
38
39pub const LARGE_CHUNK_SIZE: usize = 4096;
41
42pub trait Source: Send + Sync {
46 fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
50
51 fn reset(&mut self);
53
54 fn name(&self) -> &'static str;
56}
57
58pub trait Sink: Send + Sync {
62 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
66
67 fn finalize(&mut self) -> Result<(), OperatorError>;
69
70 fn name(&self) -> &'static str;
72}
73
74pub trait PushOperator: Send + Sync {
80 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
84
85 fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
89
90 fn preferred_chunk_size(&self) -> ChunkSizeHint {
92 ChunkSizeHint::Default
93 }
94
95 fn name(&self) -> &'static str;
97}
98
99pub struct Pipeline {
101 source: Box<dyn Source>,
102 operators: Vec<Box<dyn PushOperator>>,
103 sink: Box<dyn Sink>,
104}
105
106impl Pipeline {
107 pub fn new(
109 source: Box<dyn Source>,
110 operators: Vec<Box<dyn PushOperator>>,
111 sink: Box<dyn Sink>,
112 ) -> Self {
113 Self {
114 source,
115 operators,
116 sink,
117 }
118 }
119
120 pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
122 Self {
123 source,
124 operators: Vec::new(),
125 sink,
126 }
127 }
128
129 #[must_use]
131 pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
132 self.operators.push(op);
133 self
134 }
135
136 pub fn execute(&mut self) -> Result<(), OperatorError> {
138 let chunk_size = self.compute_chunk_size();
139
140 while let Some(chunk) = self.source.next_chunk(chunk_size)? {
142 if !self.push_through(chunk)? {
143 break;
145 }
146 }
147
148 self.finalize_all()
150 }
151
152 fn compute_chunk_size(&self) -> usize {
154 let mut size = DEFAULT_CHUNK_SIZE;
155
156 for op in &self.operators {
157 match op.preferred_chunk_size() {
158 ChunkSizeHint::Default => {}
159 ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
160 ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
161 ChunkSizeHint::Exact(s) => return s,
162 ChunkSizeHint::AtMost(s) => size = size.min(s),
163 }
164 }
165
166 size
167 }
168
169 fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
171 if self.operators.is_empty() {
172 return self.sink.consume(chunk);
174 }
175
176 let mut current_chunk = chunk;
178 let num_operators = self.operators.len();
179
180 for i in 0..num_operators {
181 let is_last = i == num_operators - 1;
182
183 if is_last {
184 return self.operators[i].push(current_chunk, &mut *self.sink);
186 }
187
188 let mut collector = ChunkCollector::new();
190 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
191
192 if !continue_processing || collector.is_empty() {
193 return Ok(continue_processing);
194 }
195
196 current_chunk = collector.into_single_chunk();
198 }
199
200 Ok(true)
201 }
202
203 fn finalize_all(&mut self) -> Result<(), OperatorError> {
205 if self.operators.is_empty() {
209 return self.sink.finalize();
210 }
211
212 for i in 0..self.operators.len() {
214 let is_last = i == self.operators.len() - 1;
215
216 if is_last {
217 self.operators[i].finalize(&mut *self.sink)?;
218 } else {
219 let mut collector = ChunkCollector::new();
221 self.operators[i].finalize(&mut collector)?;
222
223 for chunk in collector.into_chunks() {
224 self.push_through_from(chunk, i + 1)?;
226 }
227 }
228 }
229
230 self.sink.finalize()
231 }
232
233 fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
235 let mut current_chunk = chunk;
236
237 for i in start..self.operators.len() {
238 let is_last = i == self.operators.len() - 1;
239
240 if is_last {
241 return self.operators[i].push(current_chunk, &mut *self.sink);
242 }
243
244 let mut collector = ChunkCollector::new();
245 let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
246
247 if !continue_processing || collector.is_empty() {
248 return Ok(continue_processing);
249 }
250
251 current_chunk = collector.into_single_chunk();
252 }
253
254 self.sink.consume(current_chunk)
255 }
256}
257
258pub struct ChunkCollector {
260 chunks: Vec<DataChunk>,
261}
262
263impl ChunkCollector {
264 pub fn new() -> Self {
266 Self { chunks: Vec::new() }
267 }
268
269 pub fn is_empty(&self) -> bool {
271 self.chunks.is_empty()
272 }
273
274 pub fn row_count(&self) -> usize {
276 self.chunks.iter().map(DataChunk::len).sum()
277 }
278
279 pub fn into_chunks(self) -> Vec<DataChunk> {
281 self.chunks
282 }
283
284 pub fn into_single_chunk(self) -> DataChunk {
286 if self.chunks.is_empty() {
287 return DataChunk::empty();
288 }
289 if self.chunks.len() == 1 {
290 return self
292 .chunks
293 .into_iter()
294 .next()
295 .expect("chunks has exactly one element: checked on previous line");
296 }
297
298 DataChunk::concat(&self.chunks)
300 }
301}
302
303impl Default for ChunkCollector {
304 fn default() -> Self {
305 Self::new()
306 }
307}
308
309impl Sink for ChunkCollector {
310 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
311 if !chunk.is_empty() {
312 self.chunks.push(chunk);
313 }
314 Ok(true)
315 }
316
317 fn finalize(&mut self) -> Result<(), OperatorError> {
318 Ok(())
319 }
320
321 fn name(&self) -> &'static str {
322 "ChunkCollector"
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::execution::vector::ValueVector;
330 use graphos_common::types::Value;
331
332 struct TestSource {
334 remaining: usize,
335 values_per_chunk: usize,
336 }
337
338 impl TestSource {
339 fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
340 Self {
341 remaining: num_chunks,
342 values_per_chunk,
343 }
344 }
345 }
346
347 impl Source for TestSource {
348 fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
349 if self.remaining == 0 {
350 return Ok(None);
351 }
352 self.remaining -= 1;
353
354 let values: Vec<Value> = (0..self.values_per_chunk)
356 .map(|i| Value::Int64(i as i64))
357 .collect();
358 let vector = ValueVector::from_values(&values);
359 let chunk = DataChunk::new(vec![vector]);
360 Ok(Some(chunk))
361 }
362
363 fn reset(&mut self) {}
364
365 fn name(&self) -> &'static str {
366 "TestSource"
367 }
368 }
369
370 struct TestSink {
372 chunks: Vec<DataChunk>,
373 finalized: bool,
374 }
375
376 impl TestSink {
377 fn new() -> Self {
378 Self {
379 chunks: Vec::new(),
380 finalized: false,
381 }
382 }
383
384 #[allow(dead_code)]
385 fn total_rows(&self) -> usize {
386 self.chunks.iter().map(DataChunk::len).sum()
387 }
388 }
389
390 impl Sink for TestSink {
391 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
392 self.chunks.push(chunk);
393 Ok(true)
394 }
395
396 fn finalize(&mut self) -> Result<(), OperatorError> {
397 self.finalized = true;
398 Ok(())
399 }
400
401 fn name(&self) -> &'static str {
402 "TestSink"
403 }
404 }
405
406 struct PassThroughOperator;
408
409 impl PushOperator for PassThroughOperator {
410 fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
411 sink.consume(chunk)
412 }
413
414 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
415 Ok(())
416 }
417
418 fn name(&self) -> &'static str {
419 "PassThrough"
420 }
421 }
422
423 #[test]
424 fn test_simple_pipeline() {
425 let source = Box::new(TestSource::new(3, 10));
426 let sink = Box::new(TestSink::new());
427
428 let mut pipeline = Pipeline::simple(source, sink);
429 pipeline.execute().unwrap();
430
431 }
434
435 #[test]
436 fn test_pipeline_with_operator() {
437 let source = Box::new(TestSource::new(2, 5));
438 let sink = Box::new(TestSink::new());
439
440 let mut pipeline =
441 Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
442
443 pipeline.execute().unwrap();
444 }
445
446 #[test]
447 fn test_chunk_collector() {
448 let mut collector = ChunkCollector::new();
449 assert!(collector.is_empty());
450
451 let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
452 let vector = ValueVector::from_values(&values);
453 let chunk = DataChunk::new(vec![vector]);
454
455 collector.consume(chunk).unwrap();
456 assert!(!collector.is_empty());
457 assert_eq!(collector.row_count(), 2);
458
459 let merged = collector.into_single_chunk();
460 assert_eq!(merged.len(), 2);
461 }
462
463 #[test]
464 fn test_chunk_size_hints() {
465 assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
466
467 let source = Box::new(TestSource::new(1, 10));
468 let sink = Box::new(TestSink::new());
469
470 struct SmallHintOp;
472 impl PushOperator for SmallHintOp {
473 fn push(
474 &mut self,
475 chunk: DataChunk,
476 sink: &mut dyn Sink,
477 ) -> Result<bool, OperatorError> {
478 sink.consume(chunk)
479 }
480 fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
481 Ok(())
482 }
483 fn preferred_chunk_size(&self) -> ChunkSizeHint {
484 ChunkSizeHint::Small
485 }
486 fn name(&self) -> &'static str {
487 "SmallHint"
488 }
489 }
490
491 let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
492
493 let computed_size = pipeline.compute_chunk_size();
494 assert!(computed_size <= SMALL_CHUNK_SIZE);
495 }
496}