Skip to main content

appletheia_application/command/
default_command_dispatcher.rs

1use crate::command::{
2    Command, CommandDispatchError, CommandDispatcher, CommandFailureReport, CommandHandler,
3    CommandHasher, IdempotencyBeginResult, IdempotencyOutput, IdempotencyService, IdempotencyState,
4};
5use crate::request_context::RequestContext;
6use crate::unit_of_work::UnitOfWork;
7use crate::unit_of_work::UnitOfWorkFactory;
8
9#[derive(Debug)]
10pub struct DefaultCommandDispatcher<CH, IS, U> {
11    command_hasher: CH,
12    idempotency_service: IS,
13    uow_factory: U,
14}
15
16impl<CH, IS, U> DefaultCommandDispatcher<CH, IS, U>
17where
18    CH: CommandHasher,
19    IS: IdempotencyService,
20    U: UnitOfWorkFactory<Uow = IS::Uow>,
21{
22    pub fn new(command_hasher: CH, idempotency_service: IS, uow_factory: U) -> Self {
23        Self {
24            command_hasher,
25            idempotency_service,
26            uow_factory,
27        }
28    }
29
30    pub fn command_hasher(&self) -> &CH {
31        &self.command_hasher
32    }
33
34    pub fn idempotency_service(&self) -> &IS {
35        &self.idempotency_service
36    }
37}
38
39impl<CH, IS, U> CommandDispatcher for DefaultCommandDispatcher<CH, IS, U>
40where
41    CH: CommandHasher,
42    IS: IdempotencyService,
43    U: UnitOfWorkFactory<Uow = IS::Uow>,
44{
45    type Uow = IS::Uow;
46
47    async fn dispatch<H>(
48        &self,
49        handler: &H,
50        request_context: &RequestContext,
51        command: H::Command,
52    ) -> Result<H::Output, CommandDispatchError<H::Error>>
53    where
54        H: CommandHandler<Uow = Self::Uow>,
55        H::Command: Command,
56    {
57        let command_name = H::Command::NAME;
58        let command_hash = self.command_hasher.command_hash(&command)?;
59        let message_id = request_context.message_id;
60
61        let mut uow = self.uow_factory.begin().await?;
62
63        let idempotency_begin_result = self
64            .idempotency_service
65            .begin(&mut uow, message_id, command_name, &command_hash)
66            .await;
67
68        let idempotency_begin_result = match idempotency_begin_result {
69            Ok(value) => value,
70            Err(operation_error) => {
71                let operation_error = uow.rollback_with_operation_error(operation_error).await?;
72                return Err(operation_error.into());
73            }
74        };
75
76        match idempotency_begin_result {
77            IdempotencyBeginResult::New => {}
78            IdempotencyBeginResult::InProgress => match uow.rollback().await {
79                Ok(()) => return Err(CommandDispatchError::InProgress { message_id }),
80                Err(rollback_error) => return Err(rollback_error.into()),
81            },
82            IdempotencyBeginResult::Existing { state } => match state {
83                IdempotencyState::Succeeded { output } => {
84                    let decoded = serde_json::from_value(output.into())?;
85                    uow.commit().await?;
86                    return Ok(decoded);
87                }
88                IdempotencyState::Failed { error } => {
89                    uow.commit().await?;
90                    return Err(CommandDispatchError::PreviousFailure(error));
91                }
92            },
93        }
94
95        let handler_result = handler.handle(&mut uow, request_context, command).await;
96
97        match handler_result {
98            Ok(output) => {
99                let output_json = serde_json::to_value(&output)?;
100                match self
101                    .idempotency_service
102                    .complete_success(&mut uow, message_id, IdempotencyOutput::from(output_json))
103                    .await
104                {
105                    Ok(()) => {}
106                    Err(operation_error) => {
107                        let operation_error =
108                            uow.rollback_with_operation_error(operation_error).await?;
109                        return Err(operation_error.into());
110                    }
111                }
112                uow.commit().await?;
113                Ok(output)
114            }
115            Err(operation_error) => {
116                let operation_error = uow
117                    .rollback_with_operation_error(operation_error)
118                    .await
119                    .map_err(CommandDispatchError::UnitOfWork)?;
120
121                let report = CommandFailureReport::from(&operation_error);
122                if let Ok(mut uow) = self.uow_factory.begin().await {
123                    let idempotency_begin_result = self
124                        .idempotency_service
125                        .begin(&mut uow, message_id, command_name, &command_hash)
126                        .await;
127                    match idempotency_begin_result {
128                        Ok(IdempotencyBeginResult::New) => {
129                            match self
130                                .idempotency_service
131                                .complete_failure(&mut uow, message_id, report)
132                                .await
133                            {
134                                Ok(()) => {
135                                    let _ = uow.commit().await;
136                                }
137                                Err(_) => {
138                                    let _ = uow.rollback().await;
139                                }
140                            }
141                        }
142                        Ok(IdempotencyBeginResult::Existing { .. }) => {
143                            let _ = uow.commit().await;
144                        }
145                        Ok(IdempotencyBeginResult::InProgress) => {
146                            let _ = uow.rollback().await;
147                        }
148                        Err(_) => {
149                            let _ = uow.rollback().await;
150                        }
151                    }
152                }
153                Err(CommandDispatchError::Handler(operation_error))
154            }
155        }
156    }
157}