use std::any::Any;
use std::marker::PhantomData;
use async_trait::async_trait;
use crate::processor::api::{Processor, ProcessorContext};
use crate::processor::record::Record;
type Marker<T> = PhantomData<fn() -> T>;
pub(crate) struct MapValuesProcessor<V, V2, F> {
pub f: F,
pub _pd: std::marker::PhantomData<fn(V) -> V2>,
}
#[async_trait]
impl<K, V, V2, F> Processor<K, V, K, V2> for MapValuesProcessor<V, V2, F>
where
K: Any + Send + Clone,
V: Send + 'static,
V2: Any + Send + Clone,
F: Fn(&V) -> V2 + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V2>, r: Record<K, V>) {
ctx.forward(Record::new(r.key, (self.f)(&r.value), r.timestamp));
}
}
pub(crate) struct FilterProcessor<K, V, P> {
pub predicate: P,
pub negate: bool,
pub _pd: std::marker::PhantomData<fn(K, V)>,
}
#[async_trait]
impl<K, V, P> Processor<K, V, K, V> for FilterProcessor<K, V, P>
where
K: Any + Send + Clone + Default,
V: Any + Send + Clone,
P: Fn(&K, &V) -> bool + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V>, r: Record<K, V>) {
let key = r.key.clone().unwrap_or_default();
if (self.predicate)(&key, &r.value) != self.negate {
ctx.forward(r);
}
}
}
pub(crate) struct MapProcessor<K, V, K2, V2, F> {
pub f: F,
pub _pd: Marker<(K, V, K2, V2)>,
}
#[async_trait]
impl<K, V, K2, V2, F> Processor<K, V, K2, V2> for MapProcessor<K, V, K2, V2, F>
where
K: Default + Send + 'static,
V: Send + 'static,
K2: Any + Send + Clone,
V2: Any + Send + Clone,
F: Fn(&K, &V) -> (K2, V2) + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K2, V2>, r: Record<K, V>) {
let key = r.key.unwrap_or_default();
let (k2, v2) = (self.f)(&key, &r.value);
ctx.forward(Record::new(Some(k2), v2, r.timestamp));
}
}
pub(crate) struct SelectKeyProcessor<K, V, K2, F> {
pub f: F,
pub _pd: std::marker::PhantomData<fn(K, V) -> K2>,
}
#[async_trait]
impl<K, V, K2, F> Processor<K, V, K2, V> for SelectKeyProcessor<K, V, K2, F>
where
K: Default + Send + 'static,
V: Any + Send + Clone,
K2: Any + Send + Clone,
F: Fn(&K, &V) -> K2 + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K2, V>, r: Record<K, V>) {
let key = r.key.unwrap_or_default();
let k2 = (self.f)(&key, &r.value);
ctx.forward(Record::new(Some(k2), r.value, r.timestamp));
}
}
pub(crate) struct FlatMapProcessor<K, V, K2, V2, IT, F> {
pub f: F,
pub _pd: Marker<(K, V, K2, V2, IT)>,
}
#[async_trait]
impl<K, V, K2, V2, IT, F> Processor<K, V, K2, V2> for FlatMapProcessor<K, V, K2, V2, IT, F>
where
K: Default + Send + 'static,
V: Send + 'static,
K2: Any + Send + Clone,
V2: Any + Send + Clone,
IT: IntoIterator<Item = (K2, V2)> + 'static,
F: Fn(&K, &V) -> IT + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K2, V2>, r: Record<K, V>) {
let key = r.key.unwrap_or_default();
for (k2, v2) in (self.f)(&key, &r.value) {
ctx.forward(Record::new(Some(k2), v2, r.timestamp));
}
}
}
pub(crate) struct FlatMapValuesProcessor<V, V2, IT, F> {
pub f: F,
pub _pd: std::marker::PhantomData<fn(V) -> (V2, IT)>,
}
#[async_trait]
impl<K, V, V2, IT, F> Processor<K, V, K, V2> for FlatMapValuesProcessor<V, V2, IT, F>
where
K: Any + Send + Clone,
V: Send + 'static,
V2: Any + Send + Clone,
IT: IntoIterator<Item = V2> + 'static,
F: Fn(&V) -> IT + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V2>, r: Record<K, V>) {
for v2 in (self.f)(&r.value) {
ctx.forward(Record::new(r.key.clone(), v2, r.timestamp));
}
}
}
pub(crate) struct PeekProcessor<K, V, F> {
pub f: F,
pub _pd: std::marker::PhantomData<fn(K, V)>,
}
#[async_trait]
impl<K, V, F> Processor<K, V, K, V> for PeekProcessor<K, V, F>
where
K: Any + Send + Clone + Default,
V: Any + Send + Clone,
F: Fn(&K, &V) + Send + Sync + 'static,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V>, r: Record<K, V>) {
let key = r.key.clone().unwrap_or_default();
(self.f)(&key, &r.value);
ctx.forward(r);
}
}
pub(crate) struct ForeachProcessor<K, V, F> {
pub f: F,
pub _pd: std::marker::PhantomData<fn(K, V)>,
}
#[async_trait]
impl<K, V, F> Processor<K, V, K, V> for ForeachProcessor<K, V, F>
where
K: Any + Send + Clone + Default,
V: Any + Send + Clone,
F: Fn(&K, &V) + Send + Sync + 'static,
{
async fn process(&mut self, _ctx: &mut ProcessorContext<'_, '_, K, V>, r: Record<K, V>) {
let key = r.key.unwrap_or_default();
(self.f)(&key, &r.value);
}
}
pub(crate) struct MergeProcessor<K, V> {
pub _pd: std::marker::PhantomData<fn(K, V)>,
}
#[async_trait]
impl<K, V> Processor<K, V, K, V> for MergeProcessor<K, V>
where
K: Any + Send + Clone,
V: Any + Send + Clone,
{
async fn process(&mut self, ctx: &mut ProcessorContext<'_, '_, K, V>, r: Record<K, V>) {
ctx.forward(r);
}
}