grafeo_core/execution/
sink.rs1use super::chunk::DataChunk;
6use super::operators::OperatorError;
7use super::pipeline::Sink;
8
9pub struct CollectorSink {
11 chunks: Vec<DataChunk>,
12 row_count: usize,
13}
14
15impl CollectorSink {
16 pub fn new() -> Self {
18 Self {
19 chunks: Vec::new(),
20 row_count: 0,
21 }
22 }
23
24 pub fn chunks(&self) -> &[DataChunk] {
26 &self.chunks
27 }
28
29 pub fn into_chunks(self) -> Vec<DataChunk> {
31 self.chunks
32 }
33
34 pub fn row_count(&self) -> usize {
36 self.row_count
37 }
38
39 pub fn is_empty(&self) -> bool {
41 self.chunks.is_empty()
42 }
43}
44
45impl Default for CollectorSink {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl Sink for CollectorSink {
52 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
53 self.row_count += chunk.len();
54 if !chunk.is_empty() {
55 self.chunks.push(chunk);
56 }
57 Ok(true)
58 }
59
60 fn finalize(&mut self) -> Result<(), OperatorError> {
61 Ok(())
62 }
63
64 fn name(&self) -> &'static str {
65 "CollectorSink"
66 }
67
68 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
69 self
70 }
71}
72
73pub struct MaterializingSink {
77 chunks: Vec<DataChunk>,
78 row_count: usize,
79 memory_bytes: usize,
80}
81
82impl MaterializingSink {
83 pub fn new() -> Self {
85 Self {
86 chunks: Vec::new(),
87 row_count: 0,
88 memory_bytes: 0,
89 }
90 }
91
92 pub fn chunks(&self) -> &[DataChunk] {
94 &self.chunks
95 }
96
97 pub fn into_chunks(self) -> Vec<DataChunk> {
99 self.chunks
100 }
101
102 pub fn row_count(&self) -> usize {
104 self.row_count
105 }
106
107 pub fn memory_bytes(&self) -> usize {
109 self.memory_bytes
110 }
111
112 pub fn into_single_chunk(self) -> DataChunk {
114 DataChunk::concat(&self.chunks)
115 }
116}
117
118impl Default for MaterializingSink {
119 fn default() -> Self {
120 Self::new()
121 }
122}
123
124impl Sink for MaterializingSink {
125 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
126 self.row_count += chunk.len();
127 self.memory_bytes += chunk.len() * 64;
129 if !chunk.is_empty() {
130 self.chunks.push(chunk);
131 }
132 Ok(true)
133 }
134
135 fn finalize(&mut self) -> Result<(), OperatorError> {
136 Ok(())
137 }
138
139 fn name(&self) -> &'static str {
140 "MaterializingSink"
141 }
142
143 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
144 self
145 }
146}
147
148pub struct LimitingSink {
152 inner: CollectorSink,
153 limit: usize,
154 collected: usize,
155}
156
157impl LimitingSink {
158 pub fn new(limit: usize) -> Self {
160 Self {
161 inner: CollectorSink::new(),
162 limit,
163 collected: 0,
164 }
165 }
166
167 pub fn limit(&self) -> usize {
169 self.limit
170 }
171
172 pub fn is_full(&self) -> bool {
174 self.collected >= self.limit
175 }
176
177 pub fn chunks(&self) -> &[DataChunk] {
179 self.inner.chunks()
180 }
181
182 pub fn into_chunks(self) -> Vec<DataChunk> {
184 self.inner.into_chunks()
185 }
186
187 pub fn row_count(&self) -> usize {
189 self.collected
190 }
191}
192
193impl Sink for LimitingSink {
194 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
195 if self.collected >= self.limit {
196 return Ok(false);
198 }
199
200 let rows_needed = self.limit - self.collected;
201 let chunk_len = chunk.len();
202
203 if chunk_len <= rows_needed {
204 self.collected += chunk_len;
206 self.inner.consume(chunk)?;
207 } else {
208 self.collected += rows_needed;
212 self.inner.consume(chunk)?;
213 }
214
215 Ok(self.collected < self.limit)
217 }
218
219 fn finalize(&mut self) -> Result<(), OperatorError> {
220 self.inner.finalize()
221 }
222
223 fn name(&self) -> &'static str {
224 "LimitingSink"
225 }
226
227 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
228 self
229 }
230}
231
232pub struct CountingSink {
236 count: usize,
237}
238
239impl CountingSink {
240 pub fn new() -> Self {
242 Self { count: 0 }
243 }
244
245 pub fn count(&self) -> usize {
247 self.count
248 }
249}
250
251impl Default for CountingSink {
252 fn default() -> Self {
253 Self::new()
254 }
255}
256
257impl Sink for CountingSink {
258 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
259 self.count += chunk.len();
260 Ok(true)
261 }
262
263 fn finalize(&mut self) -> Result<(), OperatorError> {
264 Ok(())
265 }
266
267 fn name(&self) -> &'static str {
268 "CountingSink"
269 }
270
271 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
272 self
273 }
274}
275
276pub struct NullSink;
280
281impl NullSink {
282 pub fn new() -> Self {
284 Self
285 }
286}
287
288impl Default for NullSink {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294impl Sink for NullSink {
295 fn consume(&mut self, _chunk: DataChunk) -> Result<bool, OperatorError> {
296 Ok(true)
297 }
298
299 fn finalize(&mut self) -> Result<(), OperatorError> {
300 Ok(())
301 }
302
303 fn name(&self) -> &'static str {
304 "NullSink"
305 }
306
307 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
308 self
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::execution::vector::ValueVector;
316 use grafeo_common::types::Value;
317
318 fn create_test_chunk(values: &[i64]) -> DataChunk {
319 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
320 let vector = ValueVector::from_values(&v);
321 DataChunk::new(vec![vector])
322 }
323
324 #[test]
325 fn test_collector_sink() {
326 let mut sink = CollectorSink::new();
327
328 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
329 sink.consume(create_test_chunk(&[4, 5])).unwrap();
330 sink.finalize().unwrap();
331
332 assert_eq!(sink.row_count(), 5);
333 assert_eq!(sink.chunks().len(), 2);
334 }
335
336 #[test]
337 fn test_materializing_sink() {
338 let mut sink = MaterializingSink::new();
339
340 sink.consume(create_test_chunk(&[1, 2])).unwrap();
341 sink.consume(create_test_chunk(&[3, 4])).unwrap();
342 sink.finalize().unwrap();
343
344 assert_eq!(sink.row_count(), 4);
345
346 let merged = sink.into_single_chunk();
347 assert_eq!(merged.len(), 4);
348 }
349
350 #[test]
351 fn test_limiting_sink() {
352 let mut sink = LimitingSink::new(3);
353
354 let should_continue = sink.consume(create_test_chunk(&[1, 2])).unwrap();
356 assert!(should_continue);
357 assert!(!sink.is_full());
358
359 let should_continue = sink.consume(create_test_chunk(&[3, 4, 5])).unwrap();
361 assert!(!should_continue);
362 assert!(sink.is_full());
363 }
364
365 #[test]
366 fn test_counting_sink() {
367 let mut sink = CountingSink::new();
368
369 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
370 sink.consume(create_test_chunk(&[4, 5])).unwrap();
371 sink.finalize().unwrap();
372
373 assert_eq!(sink.count(), 5);
374 }
375
376 #[test]
377 fn test_null_sink() {
378 let mut sink = NullSink::new();
379
380 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
381 sink.consume(create_test_chunk(&[4, 5])).unwrap();
382 sink.finalize().unwrap();
383
384 }
386
387 #[test]
388 fn test_collector_sink_empty() {
389 let sink = CollectorSink::new();
390 assert!(sink.is_empty());
391 assert_eq!(sink.row_count(), 0);
392 assert_eq!(sink.chunks().len(), 0);
393 }
394
395 #[test]
396 fn test_collector_sink_into_chunks() {
397 let mut sink = CollectorSink::new();
398 sink.consume(create_test_chunk(&[1, 2])).unwrap();
399 sink.consume(create_test_chunk(&[3])).unwrap();
400
401 assert!(!sink.is_empty());
402 let chunks = sink.into_chunks();
403 assert_eq!(chunks.len(), 2);
404 assert_eq!(chunks[0].len(), 2);
405 assert_eq!(chunks[1].len(), 1);
406 }
407
408 #[test]
409 fn test_materializing_sink_memory_bytes() {
410 let mut sink = MaterializingSink::new();
411 assert_eq!(sink.memory_bytes(), 0);
412
413 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
414 assert!(sink.memory_bytes() > 0);
415 }
416
417 #[test]
418 fn test_materializing_sink_into_chunks() {
419 let mut sink = MaterializingSink::new();
420 sink.consume(create_test_chunk(&[1, 2])).unwrap();
421 sink.consume(create_test_chunk(&[3, 4])).unwrap();
422
423 let chunks = sink.into_chunks();
424 assert_eq!(chunks.len(), 2);
425 }
426
427 #[test]
428 fn test_limiting_sink_limit_getter() {
429 let sink = LimitingSink::new(42);
430 assert_eq!(sink.limit(), 42);
431 assert!(!sink.is_full());
432 }
433
434 #[test]
435 fn test_limiting_sink_into_chunks() {
436 let mut sink = LimitingSink::new(5);
437 sink.consume(create_test_chunk(&[1, 2])).unwrap();
438 sink.consume(create_test_chunk(&[3, 4])).unwrap();
439
440 let chunks = sink.into_chunks();
441 assert_eq!(chunks.len(), 2);
442 }
443
444 #[test]
445 fn test_counting_sink_empty() {
446 let sink = CountingSink::new();
447 assert_eq!(sink.count(), 0);
448 }
449
450 #[test]
451 fn test_null_sink_into_any() {
452 let sink: Box<dyn Sink> = Box::new(NullSink::new());
453 let any_box = sink.into_any();
454 assert!(any_box.downcast::<NullSink>().is_ok());
455 }
456
457 #[test]
458 fn test_collector_sink_into_any() {
459 let sink: Box<dyn Sink> = Box::new(CollectorSink::new());
460 let any_box = sink.into_any();
461 assert!(any_box.downcast::<CollectorSink>().is_ok());
462 }
463}