grafeo_core/execution/
sink.rs1use super::chunk::DataChunk;
6use super::operators::OperatorError;
7use super::pipeline::Sink;
8
9pub struct CollectorSink {
11 chunks: Vec<DataChunk>,
12 row_count: usize,
13}
14
15impl CollectorSink {
16 pub fn new() -> Self {
18 Self {
19 chunks: Vec::new(),
20 row_count: 0,
21 }
22 }
23
24 pub fn chunks(&self) -> &[DataChunk] {
26 &self.chunks
27 }
28
29 pub fn into_chunks(self) -> Vec<DataChunk> {
31 self.chunks
32 }
33
34 pub fn row_count(&self) -> usize {
36 self.row_count
37 }
38
39 pub fn is_empty(&self) -> bool {
41 self.chunks.is_empty()
42 }
43}
44
45impl Default for CollectorSink {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl Sink for CollectorSink {
52 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
53 self.row_count += chunk.len();
54 if !chunk.is_empty() {
55 self.chunks.push(chunk);
56 }
57 Ok(true)
58 }
59
60 fn finalize(&mut self) -> Result<(), OperatorError> {
61 Ok(())
62 }
63
64 fn name(&self) -> &'static str {
65 "CollectorSink"
66 }
67}
68
69pub struct MaterializingSink {
73 chunks: Vec<DataChunk>,
74 row_count: usize,
75 memory_bytes: usize,
76}
77
78impl MaterializingSink {
79 pub fn new() -> Self {
81 Self {
82 chunks: Vec::new(),
83 row_count: 0,
84 memory_bytes: 0,
85 }
86 }
87
88 pub fn chunks(&self) -> &[DataChunk] {
90 &self.chunks
91 }
92
93 pub fn into_chunks(self) -> Vec<DataChunk> {
95 self.chunks
96 }
97
98 pub fn row_count(&self) -> usize {
100 self.row_count
101 }
102
103 pub fn memory_bytes(&self) -> usize {
105 self.memory_bytes
106 }
107
108 pub fn into_single_chunk(self) -> DataChunk {
110 DataChunk::concat(&self.chunks)
111 }
112}
113
114impl Default for MaterializingSink {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120impl Sink for MaterializingSink {
121 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
122 self.row_count += chunk.len();
123 self.memory_bytes += chunk.len() * 64;
125 if !chunk.is_empty() {
126 self.chunks.push(chunk);
127 }
128 Ok(true)
129 }
130
131 fn finalize(&mut self) -> Result<(), OperatorError> {
132 Ok(())
133 }
134
135 fn name(&self) -> &'static str {
136 "MaterializingSink"
137 }
138}
139
140pub struct LimitingSink {
144 inner: CollectorSink,
145 limit: usize,
146 collected: usize,
147}
148
149impl LimitingSink {
150 pub fn new(limit: usize) -> Self {
152 Self {
153 inner: CollectorSink::new(),
154 limit,
155 collected: 0,
156 }
157 }
158
159 pub fn limit(&self) -> usize {
161 self.limit
162 }
163
164 pub fn is_full(&self) -> bool {
166 self.collected >= self.limit
167 }
168
169 pub fn chunks(&self) -> &[DataChunk] {
171 self.inner.chunks()
172 }
173
174 pub fn into_chunks(self) -> Vec<DataChunk> {
176 self.inner.into_chunks()
177 }
178
179 pub fn row_count(&self) -> usize {
181 self.collected
182 }
183}
184
185impl Sink for LimitingSink {
186 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
187 if self.collected >= self.limit {
188 return Ok(false);
190 }
191
192 let rows_needed = self.limit - self.collected;
193 let chunk_len = chunk.len();
194
195 if chunk_len <= rows_needed {
196 self.collected += chunk_len;
198 self.inner.consume(chunk)?;
199 } else {
200 self.collected += rows_needed;
204 self.inner.consume(chunk)?;
205 }
206
207 Ok(self.collected < self.limit)
209 }
210
211 fn finalize(&mut self) -> Result<(), OperatorError> {
212 self.inner.finalize()
213 }
214
215 fn name(&self) -> &'static str {
216 "LimitingSink"
217 }
218}
219
220pub struct CountingSink {
224 count: usize,
225}
226
227impl CountingSink {
228 pub fn new() -> Self {
230 Self { count: 0 }
231 }
232
233 pub fn count(&self) -> usize {
235 self.count
236 }
237}
238
239impl Default for CountingSink {
240 fn default() -> Self {
241 Self::new()
242 }
243}
244
245impl Sink for CountingSink {
246 fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
247 self.count += chunk.len();
248 Ok(true)
249 }
250
251 fn finalize(&mut self) -> Result<(), OperatorError> {
252 Ok(())
253 }
254
255 fn name(&self) -> &'static str {
256 "CountingSink"
257 }
258}
259
260pub struct NullSink;
264
265impl NullSink {
266 pub fn new() -> Self {
268 Self
269 }
270}
271
272impl Default for NullSink {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278impl Sink for NullSink {
279 fn consume(&mut self, _chunk: DataChunk) -> Result<bool, OperatorError> {
280 Ok(true)
281 }
282
283 fn finalize(&mut self) -> Result<(), OperatorError> {
284 Ok(())
285 }
286
287 fn name(&self) -> &'static str {
288 "NullSink"
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use crate::execution::vector::ValueVector;
296 use grafeo_common::types::Value;
297
298 fn create_test_chunk(values: &[i64]) -> DataChunk {
299 let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
300 let vector = ValueVector::from_values(&v);
301 DataChunk::new(vec![vector])
302 }
303
304 #[test]
305 fn test_collector_sink() {
306 let mut sink = CollectorSink::new();
307
308 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
309 sink.consume(create_test_chunk(&[4, 5])).unwrap();
310 sink.finalize().unwrap();
311
312 assert_eq!(sink.row_count(), 5);
313 assert_eq!(sink.chunks().len(), 2);
314 }
315
316 #[test]
317 fn test_materializing_sink() {
318 let mut sink = MaterializingSink::new();
319
320 sink.consume(create_test_chunk(&[1, 2])).unwrap();
321 sink.consume(create_test_chunk(&[3, 4])).unwrap();
322 sink.finalize().unwrap();
323
324 assert_eq!(sink.row_count(), 4);
325
326 let merged = sink.into_single_chunk();
327 assert_eq!(merged.len(), 4);
328 }
329
330 #[test]
331 fn test_limiting_sink() {
332 let mut sink = LimitingSink::new(3);
333
334 let should_continue = sink.consume(create_test_chunk(&[1, 2])).unwrap();
336 assert!(should_continue);
337 assert!(!sink.is_full());
338
339 let should_continue = sink.consume(create_test_chunk(&[3, 4, 5])).unwrap();
341 assert!(!should_continue);
342 assert!(sink.is_full());
343 }
344
345 #[test]
346 fn test_counting_sink() {
347 let mut sink = CountingSink::new();
348
349 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
350 sink.consume(create_test_chunk(&[4, 5])).unwrap();
351 sink.finalize().unwrap();
352
353 assert_eq!(sink.count(), 5);
354 }
355
356 #[test]
357 fn test_null_sink() {
358 let mut sink = NullSink::new();
359
360 sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
361 sink.consume(create_test_chunk(&[4, 5])).unwrap();
362 sink.finalize().unwrap();
363
364 }
366}