use super::chunk::DataChunk;
use super::operators::OperatorError;
use super::pipeline::Sink;
pub struct CollectorSink {
chunks: Vec<DataChunk>,
row_count: usize,
}
impl CollectorSink {
pub fn new() -> Self {
Self {
chunks: Vec::new(),
row_count: 0,
}
}
pub fn chunks(&self) -> &[DataChunk] {
&self.chunks
}
pub fn into_chunks(self) -> Vec<DataChunk> {
self.chunks
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
}
impl Default for CollectorSink {
fn default() -> Self {
Self::new()
}
}
impl Sink for CollectorSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
self.row_count += chunk.len();
if !chunk.is_empty() {
self.chunks.push(chunk);
}
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"CollectorSink"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
pub struct MaterializingSink {
chunks: Vec<DataChunk>,
row_count: usize,
memory_bytes: usize,
}
impl MaterializingSink {
pub fn new() -> Self {
Self {
chunks: Vec::new(),
row_count: 0,
memory_bytes: 0,
}
}
pub fn chunks(&self) -> &[DataChunk] {
&self.chunks
}
pub fn into_chunks(self) -> Vec<DataChunk> {
self.chunks
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn memory_bytes(&self) -> usize {
self.memory_bytes
}
pub fn into_single_chunk(self) -> DataChunk {
DataChunk::concat(&self.chunks)
}
}
impl Default for MaterializingSink {
fn default() -> Self {
Self::new()
}
}
impl Sink for MaterializingSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
self.row_count += chunk.len();
self.memory_bytes += chunk.len() * 64;
if !chunk.is_empty() {
self.chunks.push(chunk);
}
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"MaterializingSink"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
pub struct LimitingSink {
inner: CollectorSink,
limit: usize,
collected: usize,
}
impl LimitingSink {
pub fn new(limit: usize) -> Self {
Self {
inner: CollectorSink::new(),
limit,
collected: 0,
}
}
pub fn limit(&self) -> usize {
self.limit
}
pub fn is_full(&self) -> bool {
self.collected >= self.limit
}
pub fn chunks(&self) -> &[DataChunk] {
self.inner.chunks()
}
pub fn into_chunks(self) -> Vec<DataChunk> {
self.inner.into_chunks()
}
pub fn row_count(&self) -> usize {
self.collected
}
}
impl Sink for LimitingSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
if self.collected >= self.limit {
return Ok(false);
}
let rows_needed = self.limit - self.collected;
let chunk_len = chunk.len();
if chunk_len <= rows_needed {
self.collected += chunk_len;
self.inner.consume(chunk)?;
} else {
self.collected += rows_needed;
self.inner.consume(chunk)?;
}
Ok(self.collected < self.limit)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
self.inner.finalize()
}
fn name(&self) -> &'static str {
"LimitingSink"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
pub struct CountingSink {
count: usize,
}
impl CountingSink {
pub fn new() -> Self {
Self { count: 0 }
}
pub fn count(&self) -> usize {
self.count
}
}
impl Default for CountingSink {
fn default() -> Self {
Self::new()
}
}
impl Sink for CountingSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
self.count += chunk.len();
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"CountingSink"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
pub struct NullSink;
impl NullSink {
pub fn new() -> Self {
Self
}
}
impl Default for NullSink {
fn default() -> Self {
Self::new()
}
}
impl Sink for NullSink {
fn consume(&mut self, _chunk: DataChunk) -> Result<bool, OperatorError> {
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"NullSink"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any> {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::vector::ValueVector;
use grafeo_common::types::Value;
fn create_test_chunk(values: &[i64]) -> DataChunk {
let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
let vector = ValueVector::from_values(&v);
DataChunk::new(vec![vector])
}
#[test]
fn test_collector_sink() {
let mut sink = CollectorSink::new();
sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
sink.consume(create_test_chunk(&[4, 5])).unwrap();
sink.finalize().unwrap();
assert_eq!(sink.row_count(), 5);
assert_eq!(sink.chunks().len(), 2);
}
#[test]
fn test_materializing_sink() {
let mut sink = MaterializingSink::new();
sink.consume(create_test_chunk(&[1, 2])).unwrap();
sink.consume(create_test_chunk(&[3, 4])).unwrap();
sink.finalize().unwrap();
assert_eq!(sink.row_count(), 4);
let merged = sink.into_single_chunk();
assert_eq!(merged.len(), 4);
}
#[test]
fn test_limiting_sink() {
let mut sink = LimitingSink::new(3);
let should_continue = sink.consume(create_test_chunk(&[1, 2])).unwrap();
assert!(should_continue);
assert!(!sink.is_full());
let should_continue = sink.consume(create_test_chunk(&[3, 4, 5])).unwrap();
assert!(!should_continue);
assert!(sink.is_full());
}
#[test]
fn test_counting_sink() {
let mut sink = CountingSink::new();
sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
sink.consume(create_test_chunk(&[4, 5])).unwrap();
sink.finalize().unwrap();
assert_eq!(sink.count(), 5);
}
#[test]
fn test_null_sink() {
let mut sink = NullSink::new();
sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
sink.consume(create_test_chunk(&[4, 5])).unwrap();
sink.finalize().unwrap();
}
#[test]
fn test_collector_sink_empty() {
let sink = CollectorSink::new();
assert!(sink.is_empty());
assert_eq!(sink.row_count(), 0);
assert_eq!(sink.chunks().len(), 0);
}
#[test]
fn test_collector_sink_into_chunks() {
let mut sink = CollectorSink::new();
sink.consume(create_test_chunk(&[1, 2])).unwrap();
sink.consume(create_test_chunk(&[3])).unwrap();
assert!(!sink.is_empty());
let chunks = sink.into_chunks();
assert_eq!(chunks.len(), 2);
assert_eq!(chunks[0].len(), 2);
assert_eq!(chunks[1].len(), 1);
}
#[test]
fn test_materializing_sink_memory_bytes() {
let mut sink = MaterializingSink::new();
assert_eq!(sink.memory_bytes(), 0);
sink.consume(create_test_chunk(&[1, 2, 3])).unwrap();
assert!(sink.memory_bytes() > 0);
}
#[test]
fn test_materializing_sink_into_chunks() {
let mut sink = MaterializingSink::new();
sink.consume(create_test_chunk(&[1, 2])).unwrap();
sink.consume(create_test_chunk(&[3, 4])).unwrap();
let chunks = sink.into_chunks();
assert_eq!(chunks.len(), 2);
}
#[test]
fn test_limiting_sink_limit_getter() {
let sink = LimitingSink::new(42);
assert_eq!(sink.limit(), 42);
assert!(!sink.is_full());
}
#[test]
fn test_limiting_sink_into_chunks() {
let mut sink = LimitingSink::new(5);
sink.consume(create_test_chunk(&[1, 2])).unwrap();
sink.consume(create_test_chunk(&[3, 4])).unwrap();
let chunks = sink.into_chunks();
assert_eq!(chunks.len(), 2);
}
#[test]
fn test_counting_sink_empty() {
let sink = CountingSink::new();
assert_eq!(sink.count(), 0);
}
#[test]
fn test_null_sink_into_any() {
let sink: Box<dyn Sink> = Box::new(NullSink::new());
let any_box = sink.into_any();
assert!(any_box.downcast::<NullSink>().is_ok());
}
#[test]
fn test_collector_sink_into_any() {
let sink: Box<dyn Sink> = Box::new(CollectorSink::new());
let any_box = sink.into_any();
assert!(any_box.downcast::<CollectorSink>().is_ok());
}
}