use crate::error::SynthError;
pub trait Sink {
type Item;
fn write(&mut self, item: Self::Item) -> Result<(), SynthError>;
fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
for item in items {
self.write(item)?;
}
Ok(())
}
fn flush(&mut self) -> Result<(), SynthError>;
fn close(self) -> Result<(), SynthError>
where
Self: Sized;
fn items_written(&self) -> u64;
fn bytes_written(&self) -> Option<u64> {
None
}
}
pub struct NullSink {
count: u64,
}
impl NullSink {
pub fn new() -> Self {
Self { count: 0 }
}
}
impl Default for NullSink {
fn default() -> Self {
Self::new()
}
}
impl NullSink {
pub fn write_any<T>(&mut self, _item: T) {
self.count += 1;
}
pub fn items_written(&self) -> u64 {
self.count
}
}
pub struct VecSink<T> {
items: Vec<T>,
}
impl<T> VecSink<T> {
pub fn new() -> Self {
Self { items: Vec::new() }
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
items: Vec::with_capacity(capacity),
}
}
pub fn into_items(self) -> Vec<T> {
self.items
}
pub fn items(&self) -> &[T] {
&self.items
}
}
impl<T> Default for VecSink<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Sink for VecSink<T> {
type Item = T;
fn write(&mut self, item: Self::Item) -> Result<(), SynthError> {
self.items.push(item);
Ok(())
}
fn write_batch(&mut self, items: Vec<Self::Item>) -> Result<(), SynthError> {
self.items.extend(items);
Ok(())
}
fn flush(&mut self) -> Result<(), SynthError> {
Ok(())
}
fn close(self) -> Result<(), SynthError> {
Ok(())
}
fn items_written(&self) -> u64 {
self.items.len() as u64
}
}
pub trait PartitionedSink: Sink {
type PartitionKey;
fn write_to_partition(
&mut self,
partition: Self::PartitionKey,
item: Self::Item,
) -> Result<(), SynthError>;
fn flush_partition(&mut self, partition: Self::PartitionKey) -> Result<(), SynthError>;
}
#[derive(Debug, Clone)]
pub struct SinkBufferConfig {
pub max_items: usize,
pub max_bytes: Option<usize>,
pub flush_on_write: bool,
}
impl Default for SinkBufferConfig {
fn default() -> Self {
Self {
max_items: 10_000,
max_bytes: Some(64 * 1024 * 1024), flush_on_write: false,
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
#[test]
fn test_null_sink() {
let mut sink = NullSink::new();
sink.write_any(42);
sink.write_any(43);
assert_eq!(sink.items_written(), 2);
}
#[test]
fn test_vec_sink() {
let mut sink = VecSink::new();
sink.write(1).unwrap();
sink.write(2).unwrap();
sink.write(3).unwrap();
assert_eq!(sink.items_written(), 3);
assert_eq!(sink.into_items(), vec![1, 2, 3]);
}
#[test]
fn test_vec_sink_batch() {
let mut sink = VecSink::new();
sink.write_batch(vec![1, 2, 3]).unwrap();
sink.write_batch(vec![4, 5]).unwrap();
assert_eq!(sink.items_written(), 5);
assert_eq!(sink.into_items(), vec![1, 2, 3, 4, 5]);
}
}