dge_runtime/component/
aggregate.rs

1/// Status of merging multiple messages into one `Aggregated`
2pub enum AggregationStatus<Aggregated> {
3    /// The merge is incomplete, need more incoming messages.
4    Ignore,
5    /// For the first time, the multiple input messages are merged into one.
6    Aggregated(Aggregated),
7}
8
9#[macro_export]
10macro_rules! aggregate {
11    (
12        state=$state:ident, channel=$channel:ident, msg=$msg:ident,
13        aggregate=$aggregate:path,
14        accept_failure=$accept_failure:path,
15        output_queue=$output_queue:expr,
16        exchange=$exchange:expr $(,)?
17    ) => {
18        match $aggregate($state, &$msg).await {
19            Err(user_error) => {
20                // user handler returned error,
21                // since this may be a transient error (i.e. db op), we retry it.
22                // this behaviour is strictly for convenience
23                warn!(
24                    "failed to merge messages for {:?}, error is: {}, will be retried",
25                    &$msg, user_error
26                );
27                // reject the message to retry
28                Ok(Responsibility::Reject)
29            }
30            Ok(AggregationStatus::Ignore) => {
31                Ok(Responsibility::Accept)
32            }
33            Ok(AggregationStatus::Aggregated(merged_msg)) => {
34                $crate::maybe_send_to_next!(
35                    &merged_msg,
36                    $output_queue,
37                    $channel,
38                    $msg.into(),
39                    $accept_failure,
40                    $exchange,
41                )
42            }
43        }
44    };
45}