dge_runtime/component/
aggregate.rs1pub enum AggregationStatus<Aggregated> {
3 Ignore,
5 Aggregated(Aggregated),
7}
8
9#[macro_export]
10macro_rules! aggregate {
11 (
12 state=$state:ident, channel=$channel:ident, msg=$msg:ident,
13 aggregate=$aggregate:path,
14 accept_failure=$accept_failure:path,
15 output_queue=$output_queue:expr,
16 exchange=$exchange:expr $(,)?
17 ) => {
18 match $aggregate($state, &$msg).await {
19 Err(user_error) => {
20 warn!(
24 "failed to merge messages for {:?}, error is: {}, will be retried",
25 &$msg, user_error
26 );
27 Ok(Responsibility::Reject)
29 }
30 Ok(AggregationStatus::Ignore) => {
31 Ok(Responsibility::Accept)
32 }
33 Ok(AggregationStatus::Aggregated(merged_msg)) => {
34 $crate::maybe_send_to_next!(
35 &merged_msg,
36 $output_queue,
37 $channel,
38 $msg.into(),
39 $accept_failure,
40 $exchange,
41 )
42 }
43 }
44 };
45}