use crate::transport::{OutputBatchType, Step};
use anyhow::Result as AnyResult;
use feldera_types::postprocess::PostprocessorConfig;
use std::collections::BTreeMap;
use std::fmt::{Display, Formatter, Result as FmtResult};
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PostprocessorCreateError {
ConfigurationError(String),
FactoryNotFound(String),
}
impl Display for PostprocessorCreateError {
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
match self {
PostprocessorCreateError::ConfigurationError(msg) => {
write!(f, "Configuration error: {}", msg)
}
PostprocessorCreateError::FactoryNotFound(msg) => {
write!(
f,
"Could not locate factory generating postprocessor: {}",
msg
)
}
}
}
}
impl std::error::Error for PostprocessorCreateError {}
pub trait Postprocessor: Send + Sync {
fn batch_start(&mut self, _step: Step, _batch_type: OutputBatchType) {}
fn push_buffer(&mut self, buffer: &[u8]) -> AnyResult<Vec<u8>> {
Ok(buffer.to_vec())
}
#[allow(clippy::type_complexity)]
fn push_key(
&mut self,
key: Option<&[u8]>,
val: Option<&[u8]>,
headers: &[(&str, Option<&[u8]>)],
) -> AnyResult<(
Option<Vec<u8>>,
Option<Vec<u8>>,
Vec<(String, Option<Vec<u8>>)>,
)> {
Ok((
key.map(<[u8]>::to_vec),
val.map(<[u8]>::to_vec),
headers
.iter()
.map(|(k, v)| (k.to_string(), v.map(<[u8]>::to_vec)))
.collect(),
))
}
fn batch_end(&mut self) {}
fn memory(&self) -> usize {
0
}
fn fork(&self) -> Box<dyn Postprocessor>;
}
pub trait PostprocessorFactory: Send + Sync {
fn create(
&self,
config: &PostprocessorConfig,
) -> Result<Box<dyn Postprocessor>, PostprocessorCreateError>;
}
#[derive(Default)]
pub struct PostprocessorRegistry {
registered: BTreeMap<&'static str, Arc<dyn PostprocessorFactory>>,
}
impl PostprocessorRegistry {
pub fn new() -> Self {
Self {
registered: BTreeMap::new(),
}
}
pub fn register(&mut self, name: &'static str, factory: Box<dyn PostprocessorFactory>) {
self.registered.insert(name, Arc::from(factory));
}
pub fn get(&self, name: &str) -> Option<Arc<dyn PostprocessorFactory>> {
self.registered.get(name).cloned()
}
}