appletheia_application/command/
default_command_dispatcher.rs1use crate::command::{
2 Command, CommandDispatchError, CommandDispatcher, CommandFailureReport, CommandHandler,
3 CommandHasher, IdempotencyBeginResult, IdempotencyOutput, IdempotencyService, IdempotencyState,
4};
5use crate::request_context::RequestContext;
6use crate::unit_of_work::UnitOfWork;
7use crate::unit_of_work::UnitOfWorkFactory;
8
9#[derive(Debug)]
10pub struct DefaultCommandDispatcher<CH, IS, U> {
11 command_hasher: CH,
12 idempotency_service: IS,
13 uow_factory: U,
14}
15
16impl<CH, IS, U> DefaultCommandDispatcher<CH, IS, U>
17where
18 CH: CommandHasher,
19 IS: IdempotencyService,
20 U: UnitOfWorkFactory<Uow = IS::Uow>,
21{
22 pub fn new(command_hasher: CH, idempotency_service: IS, uow_factory: U) -> Self {
23 Self {
24 command_hasher,
25 idempotency_service,
26 uow_factory,
27 }
28 }
29
30 pub fn command_hasher(&self) -> &CH {
31 &self.command_hasher
32 }
33
34 pub fn idempotency_service(&self) -> &IS {
35 &self.idempotency_service
36 }
37}
38
39impl<CH, IS, U> CommandDispatcher for DefaultCommandDispatcher<CH, IS, U>
40where
41 CH: CommandHasher,
42 IS: IdempotencyService,
43 U: UnitOfWorkFactory<Uow = IS::Uow>,
44{
45 type Uow = IS::Uow;
46
47 async fn dispatch<H>(
48 &self,
49 handler: &H,
50 request_context: &RequestContext,
51 command: H::Command,
52 ) -> Result<H::Output, CommandDispatchError<H::Error>>
53 where
54 H: CommandHandler<Uow = Self::Uow>,
55 H::Command: Command,
56 {
57 let command_name = H::Command::NAME;
58 let command_hash = self.command_hasher.command_hash(&command)?;
59 let message_id = request_context.message_id;
60
61 let mut uow = self.uow_factory.begin().await?;
62
63 let idempotency_begin_result = self
64 .idempotency_service
65 .begin(&mut uow, message_id, command_name, &command_hash)
66 .await;
67
68 let idempotency_begin_result = match idempotency_begin_result {
69 Ok(value) => value,
70 Err(operation_error) => {
71 let operation_error = uow.rollback_with_operation_error(operation_error).await?;
72 return Err(operation_error.into());
73 }
74 };
75
76 match idempotency_begin_result {
77 IdempotencyBeginResult::New => {}
78 IdempotencyBeginResult::InProgress => match uow.rollback().await {
79 Ok(()) => return Err(CommandDispatchError::InProgress { message_id }),
80 Err(rollback_error) => return Err(rollback_error.into()),
81 },
82 IdempotencyBeginResult::Existing { state } => match state {
83 IdempotencyState::Succeeded { output } => {
84 let decoded = serde_json::from_value(output.into())?;
85 uow.commit().await?;
86 return Ok(decoded);
87 }
88 IdempotencyState::Failed { error } => {
89 uow.commit().await?;
90 return Err(CommandDispatchError::PreviousFailure(error));
91 }
92 },
93 }
94
95 let handler_result = handler.handle(&mut uow, request_context, command).await;
96
97 match handler_result {
98 Ok(output) => {
99 let output_json = serde_json::to_value(&output)?;
100 match self
101 .idempotency_service
102 .complete_success(&mut uow, message_id, IdempotencyOutput::from(output_json))
103 .await
104 {
105 Ok(()) => {}
106 Err(operation_error) => {
107 let operation_error =
108 uow.rollback_with_operation_error(operation_error).await?;
109 return Err(operation_error.into());
110 }
111 }
112 uow.commit().await?;
113 Ok(output)
114 }
115 Err(operation_error) => {
116 let operation_error = uow
117 .rollback_with_operation_error(operation_error)
118 .await
119 .map_err(CommandDispatchError::UnitOfWork)?;
120
121 let report = CommandFailureReport::from(&operation_error);
122 if let Ok(mut uow) = self.uow_factory.begin().await {
123 let idempotency_begin_result = self
124 .idempotency_service
125 .begin(&mut uow, message_id, command_name, &command_hash)
126 .await;
127 match idempotency_begin_result {
128 Ok(IdempotencyBeginResult::New) => {
129 match self
130 .idempotency_service
131 .complete_failure(&mut uow, message_id, report)
132 .await
133 {
134 Ok(()) => {
135 let _ = uow.commit().await;
136 }
137 Err(_) => {
138 let _ = uow.rollback().await;
139 }
140 }
141 }
142 Ok(IdempotencyBeginResult::Existing { .. }) => {
143 let _ = uow.commit().await;
144 }
145 Ok(IdempotencyBeginResult::InProgress) => {
146 let _ = uow.rollback().await;
147 }
148 Err(_) => {
149 let _ = uow.rollback().await;
150 }
151 }
152 }
153 Err(CommandDispatchError::Handler(operation_error))
154 }
155 }
156 }
157}