dge_runtime/component/
user_handler.rs

1#[macro_export]
2macro_rules! user_handler {
3    (
4        state=$state:ident, channel=$channel:ident, msg=$msg:ident,
5        user_handler=$user_handler:path,
6        accept_failure=$accept_failure:path,
7        output_queue=$output_queue:expr,
8        exchange=$exchange:expr $(,)?
9    ) => {
10        match $user_handler($state, &$msg).await {
11            Err(user_error) => {
12                // TODO @incomplete: for now treat user error as final, this should be recosnidered
13                warn!(
14                    "failed to process message: {:?}, error is: {}",
15                    &$msg, &user_error
16                );
17                let () =
18                    $accept_failure($msg.into(), user_error)
19                        .await
20                        .map_err(|ue| Error::UserError {
21                            error: ue.to_string(),
22                        })?;
23                Ok(Responsibility::Accept)
24            }
25            Ok(out_msg) => {
26                $crate::maybe_send_to_next!(
27                    &out_msg,
28                    $output_queue,
29                    $channel,
30                    $msg.into(),
31                    $accept_failure,
32                    $exchange,
33                )
34            }
35        }
36    };
37}