crabka_client_streams/processor/
mod.rs1pub mod api;
4pub mod erased;
5pub(crate) mod factory;
6pub mod fixed_key;
7pub(crate) mod graph;
8pub(crate) mod node;
9pub mod punctuation;
10pub mod record;
11pub mod serde;
12
13pub mod schema_serde;
14
15pub use api::{Processor, ProcessorContext, ProcessorSupplier};
16pub use erased::ProcessorError;
17pub use fixed_key::{
18 FixedKeyProcessor, FixedKeyProcessorContext, FixedKeyProcessorSupplier, FixedKeyRecord,
19};
20pub use punctuation::{Cancellable, PunctuationType, Punctuator};
21pub use record::{Record, RecordContext};
22pub use serde::{
23 BytesSerde, Changed, Consumed, DefaultSerde, I64Serde, Produced, Serde, SerdeError, SerdeRole,
24 StringSerde,
25};
26
27#[macro_export]
45macro_rules! impl_processor {
46 (
47 impl $processor:ty: ($kin:ty, $vin:ty) -> ($kout:ty, $vout:ty) {
48 $(
49 async fn init(&mut self, $init_ctx:ident) $init_body:block
50 )?
51 async fn process(&mut self, $ctx:ident, $record:ident) $process_body:block
52 $(
53 async fn close(&mut self) $close_body:block
54 )?
55 }
56 ) => {
57 #[$crate::__async_trait]
58 impl $crate::processor::api::Processor<$kin, $vin, $kout, $vout> for $processor {
59 $(
60 async fn init(
61 &mut self,
62 $init_ctx: &mut $crate::processor::api::ProcessorContext<'_, '_, $kout, $vout>,
63 ) $init_body
64 )?
65
66 async fn process(
67 &mut self,
68 $ctx: &mut $crate::processor::api::ProcessorContext<'_, '_, $kout, $vout>,
69 $record: $crate::processor::record::Record<$kin, $vin>,
70 ) $process_body
71
72 $(
73 async fn close(&mut self) $close_body
74 )?
75 }
76 };
77}
78
79#[macro_export]
98macro_rules! impl_fixed_key_processor {
99 (
100 impl $processor:ty: ($kin:ty, $vin:ty) -> $vout:ty {
101 async fn process(&mut self, $ctx:ident, $record:ident) $process_body:block
102 }
103 ) => {
104 #[$crate::__async_trait]
105 impl $crate::processor::fixed_key::FixedKeyProcessor<$kin, $vin, $vout> for $processor {
106 async fn process(
107 &mut self,
108 $ctx: &mut $crate::processor::fixed_key::FixedKeyProcessorContext<
109 '_,
110 '_,
111 '_,
112 $kin,
113 $vout,
114 >,
115 $record: $crate::processor::fixed_key::FixedKeyRecord<$kin, $vin>,
116 ) $process_body
117 }
118 };
119}