Skip to main content

crabka_client_streams/processor/
mod.rs

1//! Typed Processor API and the type-erased execution graph used by the runtime.
2
3pub mod api;
4pub mod erased;
5pub(crate) mod factory;
6pub mod fixed_key;
7pub(crate) mod graph;
8pub(crate) mod node;
9pub mod punctuation;
10pub mod record;
11pub mod serde;
12
13pub mod schema_serde;
14
15pub use api::{Processor, ProcessorContext, ProcessorSupplier};
16pub use erased::ProcessorError;
17pub use fixed_key::{
18    FixedKeyProcessor, FixedKeyProcessorContext, FixedKeyProcessorSupplier, FixedKeyRecord,
19};
20pub use punctuation::{Cancellable, PunctuationType, Punctuator};
21pub use record::{Record, RecordContext};
22pub use serde::{
23    BytesSerde, Changed, Consumed, DefaultSerde, I64Serde, Produced, Serde, SerdeError, SerdeRole,
24    StringSerde,
25};
26
27/// Implement [`Processor`] with a compact `(input_key, input_value) ->
28/// (output_key, output_value)` declaration.
29///
30/// ```
31/// use crabka_client_streams::{Record, impl_processor};
32///
33/// struct Upper;
34/// impl_processor! {
35///     impl Upper: (String, String) -> (String, String) {
36///         async fn process(&mut self, ctx, r) {
37///             ctx.forward(Record::new(r.key, r.value.to_uppercase(), r.timestamp));
38///         }
39///     }
40/// }
41/// ```
42///
43/// [`Processor`]: crate::processor::api::Processor
44#[macro_export]
45macro_rules! impl_processor {
46    (
47        impl $processor:ty: ($kin:ty, $vin:ty) -> ($kout:ty, $vout:ty) {
48            $(
49                async fn init(&mut self, $init_ctx:ident) $init_body:block
50            )?
51            async fn process(&mut self, $ctx:ident, $record:ident) $process_body:block
52            $(
53                async fn close(&mut self) $close_body:block
54            )?
55        }
56    ) => {
57        #[$crate::__async_trait]
58        impl $crate::processor::api::Processor<$kin, $vin, $kout, $vout> for $processor {
59            $(
60                async fn init(
61                    &mut self,
62                    $init_ctx: &mut $crate::processor::api::ProcessorContext<'_, '_, $kout, $vout>,
63                ) $init_body
64            )?
65
66            async fn process(
67                &mut self,
68                $ctx: &mut $crate::processor::api::ProcessorContext<'_, '_, $kout, $vout>,
69                $record: $crate::processor::record::Record<$kin, $vin>,
70            ) $process_body
71
72            $(
73                async fn close(&mut self) $close_body
74            )?
75        }
76    };
77}
78
79/// Implement [`FixedKeyProcessor`] with a compact `(key, input_value) ->
80/// output_value` declaration.
81///
82/// ```
83/// use crabka_client_streams::impl_fixed_key_processor;
84///
85/// struct UpperValue;
86/// impl_fixed_key_processor! {
87///     impl UpperValue: (String, String) -> String {
88///         async fn process(&mut self, ctx, r) {
89///             let v = r.value.clone();
90///             ctx.forward(r.with_value(v.to_uppercase()));
91///         }
92///     }
93/// }
94/// ```
95///
96/// [`FixedKeyProcessor`]: crate::processor::fixed_key::FixedKeyProcessor
97#[macro_export]
98macro_rules! impl_fixed_key_processor {
99    (
100        impl $processor:ty: ($kin:ty, $vin:ty) -> $vout:ty {
101            async fn process(&mut self, $ctx:ident, $record:ident) $process_body:block
102        }
103    ) => {
104        #[$crate::__async_trait]
105        impl $crate::processor::fixed_key::FixedKeyProcessor<$kin, $vin, $vout> for $processor {
106            async fn process(
107                &mut self,
108                $ctx: &mut $crate::processor::fixed_key::FixedKeyProcessorContext<
109                    '_,
110                    '_,
111                    '_,
112                    $kin,
113                    $vout,
114                >,
115                $record: $crate::processor::fixed_key::FixedKeyRecord<$kin, $vin>,
116            ) $process_body
117        }
118    };
119}