appletheia_application/command/
default_command_dispatcher.rs1use crate::command::{
2 Command, CommandDispatchError, CommandDispatcher, CommandFailureReport, CommandHandler,
3 CommandHasher,
4};
5use crate::idempotency::{
6 IdempotencyBeginResult, IdempotencyOutput, IdempotencyService, IdempotencyState,
7};
8use crate::request_context::RequestContext;
9use crate::unit_of_work::UnitOfWork;
10use crate::unit_of_work::UnitOfWorkFactory;
11
12#[derive(Debug)]
13pub struct DefaultCommandDispatcher<CH, IS, U> {
14 command_hasher: CH,
15 idempotency_service: IS,
16 uow_factory: U,
17}
18
19impl<CH, IS, U> DefaultCommandDispatcher<CH, IS, U>
20where
21 CH: CommandHasher,
22 IS: IdempotencyService,
23 U: UnitOfWorkFactory<Uow = IS::Uow>,
24{
25 pub fn new(command_hasher: CH, idempotency_service: IS, uow_factory: U) -> Self {
26 Self {
27 command_hasher,
28 idempotency_service,
29 uow_factory,
30 }
31 }
32
33 pub fn command_hasher(&self) -> &CH {
34 &self.command_hasher
35 }
36
37 pub fn idempotency_service(&self) -> &IS {
38 &self.idempotency_service
39 }
40}
41
42impl<CH, IS, U> CommandDispatcher for DefaultCommandDispatcher<CH, IS, U>
43where
44 CH: CommandHasher,
45 IS: IdempotencyService,
46 U: UnitOfWorkFactory<Uow = IS::Uow>,
47{
48 type Uow = IS::Uow;
49
50 async fn dispatch<H>(
51 &self,
52 handler: &H,
53 request_context: &RequestContext,
54 command: H::Command,
55 ) -> Result<H::Output, CommandDispatchError<H::Error>>
56 where
57 H: CommandHandler<Uow = Self::Uow>,
58 H::Command: Command,
59 {
60 let command_name = H::Command::NAME;
61 let command_hash = self.command_hasher.command_hash(&command)?;
62 let message_id = request_context.message_id;
63
64 let mut uow = self.uow_factory.begin().await?;
65
66 let idempotency_begin_result = self
67 .idempotency_service
68 .begin(&mut uow, message_id, command_name, &command_hash)
69 .await;
70
71 let idempotency_begin_result = match idempotency_begin_result {
72 Ok(value) => value,
73 Err(operation_error) => {
74 let operation_error = uow.rollback_with_operation_error(operation_error).await?;
75 return Err(operation_error.into());
76 }
77 };
78
79 match idempotency_begin_result {
80 IdempotencyBeginResult::New => {}
81 IdempotencyBeginResult::InProgress => match uow.rollback().await {
82 Ok(()) => return Err(CommandDispatchError::InProgress { message_id }),
83 Err(rollback_error) => return Err(rollback_error.into()),
84 },
85 IdempotencyBeginResult::Existing { state } => match state {
86 IdempotencyState::Succeeded { output } => {
87 let decoded = serde_json::from_value(output.into())?;
88 uow.commit().await?;
89 return Ok(decoded);
90 }
91 IdempotencyState::Failed { error } => {
92 uow.commit().await?;
93 return Err(CommandDispatchError::PreviousFailure(error));
94 }
95 },
96 }
97
98 let handler_result = handler.handle(&mut uow, request_context, command).await;
99
100 match handler_result {
101 Ok(output) => {
102 let output_json = serde_json::to_value(&output)?;
103 match self
104 .idempotency_service
105 .complete_success(&mut uow, message_id, IdempotencyOutput::from(output_json))
106 .await
107 {
108 Ok(()) => {}
109 Err(operation_error) => {
110 let operation_error =
111 uow.rollback_with_operation_error(operation_error).await?;
112 return Err(operation_error.into());
113 }
114 }
115 uow.commit().await?;
116 Ok(output)
117 }
118 Err(operation_error) => {
119 let operation_error = uow
120 .rollback_with_operation_error(operation_error)
121 .await
122 .map_err(CommandDispatchError::UnitOfWork)?;
123
124 let report = CommandFailureReport::from(&operation_error);
125 if let Ok(mut uow) = self.uow_factory.begin().await {
126 let idempotency_begin_result = self
127 .idempotency_service
128 .begin(&mut uow, message_id, command_name, &command_hash)
129 .await;
130 match idempotency_begin_result {
131 Ok(IdempotencyBeginResult::New) => {
132 match self
133 .idempotency_service
134 .complete_failure(&mut uow, message_id, report)
135 .await
136 {
137 Ok(()) => {
138 let _ = uow.commit().await;
139 }
140 Err(_) => {
141 let _ = uow.rollback().await;
142 }
143 }
144 }
145 Ok(IdempotencyBeginResult::Existing { .. }) => {
146 let _ = uow.commit().await;
147 }
148 Ok(IdempotencyBeginResult::InProgress) => {
149 let _ = uow.rollback().await;
150 }
151 Err(_) => {
152 let _ = uow.rollback().await;
153 }
154 }
155 }
156 Err(CommandDispatchError::Handler(operation_error))
157 }
158 }
159 }
160}