macro_rules! impl_processor {
(
impl $processor:ty: ($kin:ty, $vin:ty) -> ($kout:ty, $vout:ty) {
$(
async fn init(&mut self, $init_ctx:ident) $init_body:block
)?
async fn process(&mut self, $ctx:ident, $record:ident) $process_body:block
$(
async fn close(&mut self) $close_body:block
)?
}
) => { ... };
}Expand description
Implement Processor with a compact (input_key, input_value) -> (output_key, output_value) declaration.
use crabka_client_streams::{Record, impl_processor};
struct Upper;
impl_processor! {
impl Upper: (String, String) -> (String, String) {
async fn process(&mut self, ctx, r) {
ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
}
}
}