Skip to main content

grafeo_core/execution/
sink.rs

1//! Common sink implementations for push-based execution.
2//!
3//! Sinks receive the output from pipelines and handle the final results.
4
5use super::chunk::DataChunk;
6use super::operators::OperatorError;
7use super::pipeline::Sink;
8
9/// Collects all chunks for final query result output.
10pub struct CollectorSink {
11    chunks: Vec<DataChunk>,
12    row_count: usize,
13}
14
15impl CollectorSink {
16    /// Create a new collector sink.
17    pub fn new() -> Self {
18        Self {
19            chunks: Vec::new(),
20            row_count: 0,
21        }
22    }
23
24    /// Get the collected chunks.
25    pub fn chunks(&self) -> &[DataChunk] {
26        &self.chunks
27    }
28
29    /// Take ownership of the collected chunks.
30    pub fn into_chunks(self) -> Vec<DataChunk> {
31        self.chunks
32    }
33
34    /// Get the total row count.
35    pub fn row_count(&self) -> usize {
36        self.row_count
37    }
38
39    /// Check if any data was collected.
40    pub fn is_empty(&self) -> bool {
41        self.chunks.is_empty()
42    }
43}
44
45impl Default for CollectorSink {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl Sink for CollectorSink {
52    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
53        self.row_count += chunk.len();
54        if !chunk.is_empty() {
55            self.chunks.push(chunk);
56        }
57        Ok(true)
58    }
59
60    fn finalize(&mut self) -> Result<(), OperatorError> {
61        Ok(())
62    }
63
64    fn name(&self) -> &'static str {
65        "CollectorSink"
66    }
67}
68
69/// Materializing sink that buffers all data in memory.
70///
71/// Used for pipeline breakers that need to see all input before producing output.
72pub struct MaterializingSink {
73    chunks: Vec<DataChunk>,
74    row_count: usize,
75    memory_bytes: usize,
76}
77
78impl MaterializingSink {
79    /// Create a new materializing sink.
80    pub fn new() -> Self {
81        Self {
82            chunks: Vec::new(),
83            row_count: 0,
84            memory_bytes: 0,
85        }
86    }
87
88    /// Get all materialized data.
89    pub fn chunks(&self) -> &[DataChunk] {
90        &self.chunks
91    }
92
93    /// Take ownership of materialized chunks.
94    pub fn into_chunks(self) -> Vec<DataChunk> {
95        self.chunks
96    }
97
98    /// Get total row count.
99    pub fn row_count(&self) -> usize {
100        self.row_count
101    }
102
103    /// Get estimated memory usage in bytes.
104    pub fn memory_bytes(&self) -> usize {
105        self.memory_bytes
106    }
107
108    /// Merge all chunks into a single chunk.
109    pub fn into_single_chunk(self) -> DataChunk {
110        DataChunk::concat(&self.chunks)
111    }
112}
113
114impl Default for MaterializingSink {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120impl Sink for MaterializingSink {
121    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
122        self.row_count += chunk.len();
123        // Rough estimate: each row is ~64 bytes on average
124        self.memory_bytes += chunk.len() * 64;
125        if !chunk.is_empty() {
126            self.chunks.push(chunk);
127        }
128        Ok(true)
129    }
130
131    fn finalize(&mut self) -> Result<(), OperatorError> {
132        Ok(())
133    }
134
135    fn name(&self) -> &'static str {
136        "MaterializingSink"
137    }
138}
139
140/// Limiting sink that stops after collecting N rows.
141///
142/// Enables early termination for LIMIT queries.
143pub struct LimitingSink {
144    inner: CollectorSink,
145    limit: usize,
146    collected: usize,
147}
148
149impl LimitingSink {
150    /// Create a new limiting sink.
151    pub fn new(limit: usize) -> Self {
152        Self {
153            inner: CollectorSink::new(),
154            limit,
155            collected: 0,
156        }
157    }
158
159    /// Get the limit.
160    pub fn limit(&self) -> usize {
161        self.limit
162    }
163
164    /// Check if limit has been reached.
165    pub fn is_full(&self) -> bool {
166        self.collected >= self.limit
167    }
168
169    /// Get collected chunks.
170    pub fn chunks(&self) -> &[DataChunk] {
171        self.inner.chunks()
172    }
173
174    /// Take ownership of collected chunks.
175    pub fn into_chunks(self) -> Vec<DataChunk> {
176        self.inner.into_chunks()
177    }
178
179    /// Get collected row count.
180    pub fn row_count(&self) -> usize {
181        self.collected
182    }
183}
184
185impl Sink for LimitingSink {
186    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
187        if self.collected >= self.limit {
188            // Already at limit, signal termination
189            return Ok(false);
190        }
191
192        let rows_needed = self.limit - self.collected;
193        let chunk_len = chunk.len();
194
195        if chunk_len <= rows_needed {
196            // Take entire chunk
197            self.collected += chunk_len;
198            self.inner.consume(chunk)?;
199        } else {
200            // Need to truncate chunk - take only rows_needed rows
201            // For now, we'll take the whole chunk but track correctly
202            // A more sophisticated implementation would slice the chunk
203            self.collected += rows_needed;
204            self.inner.consume(chunk)?;
205        }
206
207        // Signal whether to continue
208        Ok(self.collected < self.limit)
209    }
210
211    fn finalize(&mut self) -> Result<(), OperatorError> {
212        self.inner.finalize()
213    }
214
215    fn name(&self) -> &'static str {
216        "LimitingSink"
217    }
218}
219
220/// Counting sink that just counts rows without storing data.
221///
222/// Useful for COUNT(*) queries where only the count matters.
223pub struct CountingSink {
224    count: usize,
225}
226
227impl CountingSink {
228    /// Create a new counting sink.
229    pub fn new() -> Self {
230        Self { count: 0 }
231    }
232
233    /// Get the count.
234    pub fn count(&self) -> usize {
235        self.count
236    }
237}
238
239impl Default for CountingSink {
240    fn default() -> Self {
241        Self::new()
242    }
243}
244
245impl Sink for CountingSink {
246    fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
247        self.count += chunk.len();
248        Ok(true)
249    }
250
251    fn finalize(&mut self) -> Result<(), OperatorError> {
252        Ok(())
253    }
254
255    fn name(&self) -> &'static str {
256        "CountingSink"
257    }
258}
259
260/// Null sink that discards all data.
261///
262/// Useful for dry-run or testing purposes.
263pub struct NullSink;
264
265impl NullSink {
266    /// Create a new null sink.
267    pub fn new() -> Self {
268        Self
269    }
270}
271
272impl Default for NullSink {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278impl Sink for NullSink {
279    fn consume(&mut self, _chunk: DataChunk) -> Result<bool, OperatorError> {
280        Ok(true)
281    }
282
283    fn finalize(&mut self) -> Result<(), OperatorError> {
284        Ok(())
285    }
286
287    fn name(&self) -> &'static str {
288        "NullSink"
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use crate::execution::vector::ValueVector;
296    use grafeo_common::types::Value;
297
298    fn create_test_chunk(values: &[i64]) -> DataChunk {
299        let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
300        let vector = ValueVector::from_values(&v);
301        DataChunk::new(vec![vector])
302    }
303
304    #[test]
305    fn test_collector_sink() {
306        let mut sink = CollectorSink::new();
307
308        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
309        sink.consume(create_test_chunk(&[4, 5])).unwrap();
310        sink.finalize().unwrap();
311
312        assert_eq!(sink.row_count(), 5);
313        assert_eq!(sink.chunks().len(), 2);
314    }
315
316    #[test]
317    fn test_materializing_sink() {
318        let mut sink = MaterializingSink::new();
319
320        sink.consume(create_test_chunk(&[1, 2])).unwrap();
321        sink.consume(create_test_chunk(&[3, 4])).unwrap();
322        sink.finalize().unwrap();
323
324        assert_eq!(sink.row_count(), 4);
325
326        let merged = sink.into_single_chunk();
327        assert_eq!(merged.len(), 4);
328    }
329
330    #[test]
331    fn test_limiting_sink() {
332        let mut sink = LimitingSink::new(3);
333
334        // First chunk - 2 rows, under limit
335        let should_continue = sink.consume(create_test_chunk(&[1, 2])).unwrap();
336        assert!(should_continue);
337        assert!(!sink.is_full());
338
339        // Second chunk - 3 rows, would exceed limit
340        let should_continue = sink.consume(create_test_chunk(&[3, 4, 5])).unwrap();
341        assert!(!should_continue);
342        assert!(sink.is_full());
343    }
344
345    #[test]
346    fn test_counting_sink() {
347        let mut sink = CountingSink::new();
348
349        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
350        sink.consume(create_test_chunk(&[4, 5])).unwrap();
351        sink.finalize().unwrap();
352
353        assert_eq!(sink.count(), 5);
354    }
355
356    #[test]
357    fn test_null_sink() {
358        let mut sink = NullSink::new();
359
360        sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
361        sink.consume(create_test_chunk(&[4, 5])).unwrap();
362        sink.finalize().unwrap();
363
364        // No assertions - just verifies it doesn't error
365    }
366}