Skip to main content

appletheia_application/command/
default_command_dispatcher.rs

1use crate::command::{
2    Command, CommandDispatchError, CommandDispatcher, CommandFailureReport, CommandHandler,
3    CommandHasher,
4};
5use crate::idempotency::{
6    IdempotencyBeginResult, IdempotencyOutput, IdempotencyService, IdempotencyState,
7};
8use crate::request_context::RequestContext;
9use crate::unit_of_work::UnitOfWork;
10use crate::unit_of_work::UnitOfWorkFactory;
11
12#[derive(Debug)]
13pub struct DefaultCommandDispatcher<CH, IS, U> {
14    command_hasher: CH,
15    idempotency_service: IS,
16    uow_factory: U,
17}
18
19impl<CH, IS, U> DefaultCommandDispatcher<CH, IS, U>
20where
21    CH: CommandHasher,
22    IS: IdempotencyService,
23    U: UnitOfWorkFactory<Uow = IS::Uow>,
24{
25    pub fn new(command_hasher: CH, idempotency_service: IS, uow_factory: U) -> Self {
26        Self {
27            command_hasher,
28            idempotency_service,
29            uow_factory,
30        }
31    }
32
33    pub fn command_hasher(&self) -> &CH {
34        &self.command_hasher
35    }
36
37    pub fn idempotency_service(&self) -> &IS {
38        &self.idempotency_service
39    }
40}
41
42impl<CH, IS, U> CommandDispatcher for DefaultCommandDispatcher<CH, IS, U>
43where
44    CH: CommandHasher,
45    IS: IdempotencyService,
46    U: UnitOfWorkFactory<Uow = IS::Uow>,
47{
48    type Uow = IS::Uow;
49
50    async fn dispatch<H>(
51        &self,
52        handler: &H,
53        request_context: &RequestContext,
54        command: H::Command,
55    ) -> Result<H::Output, CommandDispatchError<H::Error>>
56    where
57        H: CommandHandler<Uow = Self::Uow>,
58        H::Command: Command,
59    {
60        let command_name = H::Command::NAME;
61        let command_hash = self.command_hasher.command_hash(&command)?;
62        let message_id = request_context.message_id;
63
64        let mut uow = self.uow_factory.begin().await?;
65
66        let idempotency_begin_result = self
67            .idempotency_service
68            .begin(&mut uow, message_id, command_name, &command_hash)
69            .await;
70
71        let idempotency_begin_result = match idempotency_begin_result {
72            Ok(value) => value,
73            Err(operation_error) => {
74                let operation_error = uow.rollback_with_operation_error(operation_error).await?;
75                return Err(operation_error.into());
76            }
77        };
78
79        match idempotency_begin_result {
80            IdempotencyBeginResult::New => {}
81            IdempotencyBeginResult::InProgress => match uow.rollback().await {
82                Ok(()) => return Err(CommandDispatchError::InProgress { message_id }),
83                Err(rollback_error) => return Err(rollback_error.into()),
84            },
85            IdempotencyBeginResult::Existing { state } => match state {
86                IdempotencyState::Succeeded { output } => {
87                    let decoded = serde_json::from_value(output.into())?;
88                    uow.commit().await?;
89                    return Ok(decoded);
90                }
91                IdempotencyState::Failed { error } => {
92                    uow.commit().await?;
93                    return Err(CommandDispatchError::PreviousFailure(error));
94                }
95            },
96        }
97
98        let handler_result = handler.handle(&mut uow, request_context, command).await;
99
100        match handler_result {
101            Ok(output) => {
102                let output_json = serde_json::to_value(&output)?;
103                match self
104                    .idempotency_service
105                    .complete_success(&mut uow, message_id, IdempotencyOutput::from(output_json))
106                    .await
107                {
108                    Ok(()) => {}
109                    Err(operation_error) => {
110                        let operation_error =
111                            uow.rollback_with_operation_error(operation_error).await?;
112                        return Err(operation_error.into());
113                    }
114                }
115                uow.commit().await?;
116                Ok(output)
117            }
118            Err(operation_error) => {
119                let operation_error = uow
120                    .rollback_with_operation_error(operation_error)
121                    .await
122                    .map_err(CommandDispatchError::UnitOfWork)?;
123
124                let report = CommandFailureReport::from(&operation_error);
125                if let Ok(mut uow) = self.uow_factory.begin().await {
126                    let idempotency_begin_result = self
127                        .idempotency_service
128                        .begin(&mut uow, message_id, command_name, &command_hash)
129                        .await;
130                    match idempotency_begin_result {
131                        Ok(IdempotencyBeginResult::New) => {
132                            match self
133                                .idempotency_service
134                                .complete_failure(&mut uow, message_id, report)
135                                .await
136                            {
137                                Ok(()) => {
138                                    let _ = uow.commit().await;
139                                }
140                                Err(_) => {
141                                    let _ = uow.rollback().await;
142                                }
143                            }
144                        }
145                        Ok(IdempotencyBeginResult::Existing { .. }) => {
146                            let _ = uow.commit().await;
147                        }
148                        Ok(IdempotencyBeginResult::InProgress) => {
149                            let _ = uow.rollback().await;
150                        }
151                        Err(_) => {
152                            let _ = uow.rollback().await;
153                        }
154                    }
155                }
156                Err(CommandDispatchError::Handler(operation_error))
157            }
158        }
159    }
160}