1use std::str::FromStr;
4
5use async_trait::async_trait;
6use cdk_common::database::mint::{SagaDatabase, SagaTransaction};
7use cdk_common::database::Error;
8use cdk_common::mint;
9use cdk_common::util::unix_time;
10use serde_json;
11
12use super::{SQLMintDatabase, SQLTransaction};
13use crate::pool::DatabasePool;
14use crate::stmt::{query, Column};
15use crate::{column_as_number, column_as_string, unpack_into};
16
17fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
18 unpack_into!(
19 let (
20 operation_id,
21 operation_kind,
22 state,
23 quote_id,
24 finalization_data,
25 created_at,
26 updated_at
27 ) = row
28 );
29
30 let operation_id_str = column_as_string!(&operation_id);
31 let operation_id = uuid::Uuid::parse_str(&operation_id_str)
32 .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
33
34 let operation_kind_str = column_as_string!(&operation_kind);
35 let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
36 .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
37
38 let state_str = column_as_string!(&state);
39 let state = mint::SagaStateEnum::new(operation_kind, &state_str)
40 .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
41
42 let quote_id = match "e_id {
43 Column::Text(s) => {
44 if s.is_empty() {
45 None
46 } else {
47 Some(s.clone())
48 }
49 }
50 Column::Null => None,
51 _ => None,
52 };
53
54 let finalization_data =
55 match &finalization_data {
56 Column::Text(s) => Some(serde_json::from_str(s).map_err(|e| {
57 Error::Internal(format!("Invalid melt finalization data JSON: {e}"))
58 })?),
59 Column::Null => None,
60 _ => None,
61 };
62
63 let created_at: u64 = column_as_number!(created_at);
64 let updated_at: u64 = column_as_number!(updated_at);
65
66 Ok(mint::Saga {
67 operation_id,
68 operation_kind,
69 state,
70 quote_id,
71 finalization_data,
72 created_at,
73 updated_at,
74 })
75}
76
77#[async_trait]
78impl<RM> SagaTransaction for SQLTransaction<RM>
79where
80 RM: DatabasePool + 'static,
81{
82 type Err = Error;
83
84 async fn get_saga(
85 &mut self,
86 operation_id: &uuid::Uuid,
87 ) -> Result<Option<mint::Saga>, Self::Err> {
88 Ok(query(
89 r#"
90 SELECT
91 operation_id,
92 operation_kind,
93 state,
94 quote_id,
95 finalization_data,
96 created_at,
97 updated_at
98 FROM
99 saga_state
100 WHERE
101 operation_id = :operation_id
102 FOR UPDATE
103 "#,
104 )?
105 .bind("operation_id", operation_id.to_string())
106 .fetch_one(&self.inner)
107 .await?
108 .map(sql_row_to_saga)
109 .transpose()?)
110 }
111
112 async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
113 let current_time = unix_time();
114
115 query(
116 r#"
117 INSERT INTO saga_state
118 (operation_id, operation_kind, state, quote_id, finalization_data, created_at, updated_at)
119 VALUES
120 (:operation_id, :operation_kind, :state, :quote_id, :finalization_data, :created_at, :updated_at)
121 "#,
122 )?
123 .bind("operation_id", saga.operation_id.to_string())
124 .bind("operation_kind", saga.operation_kind.to_string())
125 .bind("state", saga.state.state())
126 .bind("quote_id", saga.quote_id.as_deref())
127 .bind(
128 "finalization_data",
129 saga.finalization_data
130 .as_ref()
131 .map(serde_json::to_string)
132 .transpose()
133 .map_err(|e| Error::Internal(format!("Failed to serialize melt finalization data: {e}")))?,
134 )
135 .bind("created_at", saga.created_at as i64)
136 .bind("updated_at", current_time as i64)
137 .execute(&self.inner)
138 .await?;
139
140 Ok(())
141 }
142
143 async fn update_saga(
144 &mut self,
145 operation_id: &uuid::Uuid,
146 new_state: mint::SagaStateEnum,
147 ) -> Result<(), Self::Err> {
148 let current_time = unix_time();
149
150 query(
151 r#"
152 UPDATE saga_state
153 SET state = :state, updated_at = :updated_at
154 WHERE operation_id = :operation_id
155 "#,
156 )?
157 .bind("state", new_state.state())
158 .bind("updated_at", current_time as i64)
159 .bind("operation_id", operation_id.to_string())
160 .execute(&self.inner)
161 .await?;
162
163 Ok(())
164 }
165
166 async fn update_saga_with_finalization_data(
167 &mut self,
168 operation_id: &uuid::Uuid,
169 new_state: mint::SagaStateEnum,
170 finalization_data: Option<&mint::MeltFinalizationData>,
171 ) -> Result<(), Self::Err> {
172 let current_time = unix_time();
173
174 query(
175 r#"
176 UPDATE saga_state
177 SET state = :state, finalization_data = :finalization_data, updated_at = :updated_at
178 WHERE operation_id = :operation_id
179 "#,
180 )?
181 .bind("state", new_state.state())
182 .bind(
183 "finalization_data",
184 finalization_data
185 .map(serde_json::to_string)
186 .transpose()
187 .map_err(|e| {
188 Error::Internal(format!("Failed to serialize melt finalization data: {e}"))
189 })?,
190 )
191 .bind("updated_at", current_time as i64)
192 .bind("operation_id", operation_id.to_string())
193 .execute(&self.inner)
194 .await?;
195
196 Ok(())
197 }
198
199 async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
200 query(
201 r#"
202 DELETE FROM saga_state
203 WHERE operation_id = :operation_id
204 "#,
205 )?
206 .bind("operation_id", operation_id.to_string())
207 .execute(&self.inner)
208 .await?;
209
210 Ok(())
211 }
212}
213
214#[async_trait]
215impl<RM> SagaDatabase for SQLMintDatabase<RM>
216where
217 RM: DatabasePool + 'static,
218{
219 type Err = Error;
220
221 async fn get_melt_saga_by_quote_id(
222 &self,
223 quote_id: &cdk_common::QuoteId,
224 ) -> Result<Option<mint::Saga>, Self::Err> {
225 let conn = self
226 .pool
227 .get()
228 .await
229 .map_err(|e| Error::Database(Box::new(e)))?;
230 Ok(query(
231 r#"
232 SELECT
233 operation_id,
234 operation_kind,
235 state,
236 quote_id,
237 finalization_data,
238 created_at,
239 updated_at
240 FROM
241 saga_state
242 WHERE
243 quote_id = :quote_id
244 AND operation_kind = :operation_kind
245 "#,
246 )?
247 .bind("quote_id", quote_id.to_string())
248 .bind("operation_kind", mint::OperationKind::Melt.to_string())
249 .fetch_one(&*conn)
250 .await?
251 .map(sql_row_to_saga)
252 .transpose()?)
253 }
254
255 async fn get_incomplete_sagas(
256 &self,
257 operation_kind: mint::OperationKind,
258 ) -> Result<Vec<mint::Saga>, Self::Err> {
259 let conn = self
260 .pool
261 .get()
262 .await
263 .map_err(|e| Error::Database(Box::new(e)))?;
264 Ok(query(
265 r#"
266 SELECT
267 operation_id,
268 operation_kind,
269 state,
270 quote_id,
271 finalization_data,
272 created_at,
273 updated_at
274 FROM
275 saga_state
276 WHERE
277 operation_kind = :operation_kind
278 ORDER BY created_at ASC
279 "#,
280 )?
281 .bind("operation_kind", operation_kind.to_string())
282 .fetch_all(&*conn)
283 .await?
284 .into_iter()
285 .map(sql_row_to_saga)
286 .collect::<Result<Vec<_>, _>>()?)
287 }
288}