Skip to main content

grafeo_core/execution/
sink.rs

1//! Common sink implementations for push-based execution.
2//!
3//! Sinks receive the output from pipelines and handle the final results.
4
5use super::chunk::DataChunk;
6use super::operators::OperatorError;
7use super::pipeline::Sink;
8
9/// Collects all chunks for final query result output.
10pub struct CollectorSink {
11    chunks: Vec<DataChunk>,
12    row_count: usize,
13}
14
15impl CollectorSink {
16    /// Create a new collector sink.
17    pub fn new() -> Self {
18        Self {
19            chunks: Vec::new(),
20            row_count: 0,
21        }
22    }
23
24    /// Get the collected chunks.
25    pub fn chunks(&self) -> &[DataChunk] {
26        &self.chunks
27    }
28
29    /// Take ownership of the collected chunks.
30    pub fn into_chunks(self) -> Vec<DataChunk> {
31        self.chunks
32    }
33
34    /// Get the total row count.
35    pub fn row_count(&self) -> usize {
36        self.row_count
37    }
38
39    /// Check if any data was collected.
40    pub fn is_empty(&self) -> bool {
41        self.chunks.is_empty()
42    }
43}
44
45impl Default for CollectorSink {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl Sink for CollectorSink {
52    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
53        self.row_count += chunk.len();
54        if !chunk.is_empty() {
55            self.chunks.push(chunk);
56        }
57        Ok(true)
58    }
59
60    fn finalize(&mut self) -> Result<(), OperatorError> {
61        Ok(())
62    }
63
64    fn name(&self) -> &'static str {
65        "CollectorSink"
66    }
67
68    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
69        self
70    }
71}
72
73/// Materializing sink that buffers all data in memory.
74///
75/// Used for pipeline breakers that need to see all input before producing output.
76pub struct MaterializingSink {
77    chunks: Vec<DataChunk>,
78    row_count: usize,
79    memory_bytes: usize,
80}
81
82impl MaterializingSink {
83    /// Create a new materializing sink.
84    pub fn new() -> Self {
85        Self {
86            chunks: Vec::new(),
87            row_count: 0,
88            memory_bytes: 0,
89        }
90    }
91
92    /// Get all materialized data.
93    pub fn chunks(&self) -> &[DataChunk] {
94        &self.chunks
95    }
96
97    /// Take ownership of materialized chunks.
98    pub fn into_chunks(self) -> Vec<DataChunk> {
99        self.chunks
100    }
101
102    /// Get total row count.
103    pub fn row_count(&self) -> usize {
104        self.row_count
105    }
106
107    /// Get estimated memory usage in bytes.
108    pub fn memory_bytes(&self) -> usize {
109        self.memory_bytes
110    }
111
112    /// Merge all chunks into a single chunk.
113    pub fn into_single_chunk(self) -> DataChunk {
114        DataChunk::concat(&self.chunks)
115    }
116}
117
118impl Default for MaterializingSink {
119    fn default() -> Self {
120        Self::new()
121    }
122}
123
124impl Sink for MaterializingSink {
125    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
126        self.row_count += chunk.len();
127        // Rough estimate: each row is ~64 bytes on average
128        self.memory_bytes += chunk.len() * 64;
129        if !chunk.is_empty() {
130            self.chunks.push(chunk);
131        }
132        Ok(true)
133    }
134
135    fn finalize(&mut self) -> Result<(), OperatorError> {
136        Ok(())
137    }
138
139    fn name(&self) -> &'static str {
140        "MaterializingSink"
141    }
142
143    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
144        self
145    }
146}
147
148/// Limiting sink that stops after collecting N rows.
149///
150/// Enables early termination for LIMIT queries.
151pub struct LimitingSink {
152    inner: CollectorSink,
153    limit: usize,
154    collected: usize,
155}
156
157impl LimitingSink {
158    /// Create a new limiting sink.
159    pub fn new(limit: usize) -> Self {
160        Self {
161            inner: CollectorSink::new(),
162            limit,
163            collected: 0,
164        }
165    }
166
167    /// Get the limit.
168    pub fn limit(&self) -> usize {
169        self.limit
170    }
171
172    /// Check if limit has been reached.
173    pub fn is_full(&self) -> bool {
174        self.collected >= self.limit
175    }
176
177    /// Get collected chunks.
178    pub fn chunks(&self) -> &[DataChunk] {
179        self.inner.chunks()
180    }
181
182    /// Take ownership of collected chunks.
183    pub fn into_chunks(self) -> Vec<DataChunk> {
184        self.inner.into_chunks()
185    }
186
187    /// Get collected row count.
188    pub fn row_count(&self) -> usize {
189        self.collected
190    }
191}
192
193impl Sink for LimitingSink {
194    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
195        if self.collected >= self.limit {
196            // Already at limit, signal termination
197            return Ok(false);
198        }
199
200        let rows_needed = self.limit - self.collected;
201        let chunk_len = chunk.len();
202
203        if chunk_len <= rows_needed {
204            // Take entire chunk
205            self.collected += chunk_len;
206            self.inner.consume(chunk)?;
207        } else {
208            // Need to truncate chunk - take only rows_needed rows
209            // For now, we'll take the whole chunk but track correctly
210            // A more sophisticated implementation would slice the chunk
211            self.collected += rows_needed;
212            self.inner.consume(chunk)?;
213        }
214
215        // Signal whether to continue
216        Ok(self.collected < self.limit)
217    }
218
219    fn finalize(&mut self) -> Result<(), OperatorError> {
220        self.inner.finalize()
221    }
222
223    fn name(&self) -> &'static str {
224        "LimitingSink"
225    }
226
227    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
228        self
229    }
230}
231
232/// Counting sink that just counts rows without storing data.
233///
234/// Useful for COUNT(*) queries where only the count matters.
235pub struct CountingSink {
236    count: usize,
237}
238
239impl CountingSink {
240    /// Create a new counting sink.
241    pub fn new() -> Self {
242        Self { count: 0 }
243    }
244
245    /// Get the count.
246    pub fn count(&self) -> usize {
247        self.count
248    }
249}
250
251impl Default for CountingSink {
252    fn default() -> Self {
253        Self::new()
254    }
255}
256
257impl Sink for CountingSink {
258    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
259        self.count += chunk.len();
260        Ok(true)
261    }
262
263    fn finalize(&mut self) -> Result<(), OperatorError> {
264        Ok(())
265    }
266
267    fn name(&self) -> &'static str {
268        "CountingSink"
269    }
270
271    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
272        self
273    }
274}
275
276/// Null sink that discards all data.
277///
278/// Useful for dry-run or testing purposes.
279pub struct NullSink;
280
281impl NullSink {
282    /// Create a new null sink.
283    pub fn new() -> Self {
284        Self
285    }
286}
287
288impl Default for NullSink {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294impl Sink for NullSink {
295    fn consume(&mut self, _chunk: DataChunk) -> Result<bool, OperatorError> {
296        Ok(true)
297    }
298
299    fn finalize(&mut self) -> Result<(), OperatorError> {
300        Ok(())
301    }
302
303    fn name(&self) -> &'static str {
304        "NullSink"
305    }
306
307    fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
308        self
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::execution::vector::ValueVector;
316    use grafeo_common::types::Value;
317
318    fn create_test_chunk(values: &[i64]) -> DataChunk {
319        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
320        let vector = ValueVector::from_values(&v);
321        DataChunk::new(vec![vector])
322    }
323
324    #[test]
325    fn test_collector_sink() {
326        let mut sink = CollectorSink::new();
327
328        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
329        sink.consume(create_test_chunk(&[4, 5])).unwrap();
330        sink.finalize().unwrap();
331
332        assert_eq!(sink.row_count(), 5);
333        assert_eq!(sink.chunks().len(), 2);
334    }
335
336    #[test]
337    fn test_materializing_sink() {
338        let mut sink = MaterializingSink::new();
339
340        sink.consume(create_test_chunk(&[1, 2])).unwrap();
341        sink.consume(create_test_chunk(&[3, 4])).unwrap();
342        sink.finalize().unwrap();
343
344        assert_eq!(sink.row_count(), 4);
345
346        let merged = sink.into_single_chunk();
347        assert_eq!(merged.len(), 4);
348    }
349
350    #[test]
351    fn test_limiting_sink() {
352        let mut sink = LimitingSink::new(3);
353
354        // First chunk - 2 rows, under limit
355        let should_continue = sink.consume(create_test_chunk(&[1, 2])).unwrap();
356        assert!(should_continue);
357        assert!(!sink.is_full());
358
359        // Second chunk - 3 rows, would exceed limit
360        let should_continue = sink.consume(create_test_chunk(&[3, 4, 5])).unwrap();
361        assert!(!should_continue);
362        assert!(sink.is_full());
363    }
364
365    #[test]
366    fn test_counting_sink() {
367        let mut sink = CountingSink::new();
368
369        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
370        sink.consume(create_test_chunk(&[4, 5])).unwrap();
371        sink.finalize().unwrap();
372
373        assert_eq!(sink.count(), 5);
374    }
375
376    #[test]
377    fn test_null_sink() {
378        let mut sink = NullSink::new();
379
380        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
381        sink.consume(create_test_chunk(&[4, 5])).unwrap();
382        sink.finalize().unwrap();
383
384        // NullSink discards everything, no assertion needed
385    }
386
387    #[test]
388    fn test_collector_sink_empty() {
389        let sink = CollectorSink::new();
390        assert!(sink.is_empty());
391        assert_eq!(sink.row_count(), 0);
392        assert_eq!(sink.chunks().len(), 0);
393    }
394
395    #[test]
396    fn test_collector_sink_into_chunks() {
397        let mut sink = CollectorSink::new();
398        sink.consume(create_test_chunk(&[1, 2])).unwrap();
399        sink.consume(create_test_chunk(&[3])).unwrap();
400
401        assert!(!sink.is_empty());
402        let chunks = sink.into_chunks();
403        assert_eq!(chunks.len(), 2);
404        assert_eq!(chunks[0].len(), 2);
405        assert_eq!(chunks[1].len(), 1);
406    }
407
408    #[test]
409    fn test_materializing_sink_memory_bytes() {
410        let mut sink = MaterializingSink::new();
411        assert_eq!(sink.memory_bytes(), 0);
412
413        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
414        assert!(sink.memory_bytes() > 0);
415    }
416
417    #[test]
418    fn test_materializing_sink_into_chunks() {
419        let mut sink = MaterializingSink::new();
420        sink.consume(create_test_chunk(&[1, 2])).unwrap();
421        sink.consume(create_test_chunk(&[3, 4])).unwrap();
422
423        let chunks = sink.into_chunks();
424        assert_eq!(chunks.len(), 2);
425    }
426
427    #[test]
428    fn test_limiting_sink_limit_getter() {
429        let sink = LimitingSink::new(42);
430        assert_eq!(sink.limit(), 42);
431        assert!(!sink.is_full());
432    }
433
434    #[test]
435    fn test_limiting_sink_into_chunks() {
436        let mut sink = LimitingSink::new(5);
437        sink.consume(create_test_chunk(&[1, 2])).unwrap();
438        sink.consume(create_test_chunk(&[3, 4])).unwrap();
439
440        let chunks = sink.into_chunks();
441        assert_eq!(chunks.len(), 2);
442    }
443
444    #[test]
445    fn test_counting_sink_empty() {
446        let sink = CountingSink::new();
447        assert_eq!(sink.count(), 0);
448    }
449
450    #[test]
451    fn test_null_sink_into_any() {
452        let sink: Box<dyn Sink> = Box::new(NullSink::new());
453        let any_box = sink.into_any();
454        assert!(any_box.downcast::<NullSink>().is_ok());
455    }
456
457    #[test]
458    fn test_collector_sink_into_any() {
459        let sink: Box<dyn Sink> = Box::new(CollectorSink::new());
460        let any_box = sink.into_any();
461        assert!(any_box.downcast::<CollectorSink>().is_ok());
462    }
463}