Skip to main content

cdk_sql_common/mint/
saga.rs

1//! Saga database implementation
2
3use std::str::FromStr;
4
5use async_trait::async_trait;
6use cdk_common::database::mint::{SagaDatabase, SagaTransaction};
7use cdk_common::database::Error;
8use cdk_common::mint;
9use cdk_common::util::unix_time;
10use serde_json;
11
12use super::{SQLMintDatabase, SQLTransaction};
13use crate::pool::DatabasePool;
14use crate::stmt::{query, Column};
15use crate::{column_as_number, column_as_string, unpack_into};
16
17fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
18    unpack_into!(
19        let (
20            operation_id,
21            operation_kind,
22            state,
23            quote_id,
24            finalization_data,
25            created_at,
26            updated_at
27        ) = row
28    );
29
30    let operation_id_str = column_as_string!(&operation_id);
31    let operation_id = uuid::Uuid::parse_str(&operation_id_str)
32        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
33
34    let operation_kind_str = column_as_string!(&operation_kind);
35    let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
36        .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
37
38    let state_str = column_as_string!(&state);
39    let state = mint::SagaStateEnum::new(operation_kind, &state_str)
40        .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
41
42    let quote_id = match &quote_id {
43        Column::Text(s) => {
44            if s.is_empty() {
45                None
46            } else {
47                Some(s.clone())
48            }
49        }
50        Column::Null => None,
51        _ => None,
52    };
53
54    let finalization_data =
55        match &finalization_data {
56            Column::Text(s) => Some(serde_json::from_str(s).map_err(|e| {
57                Error::Internal(format!("Invalid melt finalization data JSON: {e}"))
58            })?),
59            Column::Null => None,
60            _ => None,
61        };
62
63    let created_at: u64 = column_as_number!(created_at);
64    let updated_at: u64 = column_as_number!(updated_at);
65
66    Ok(mint::Saga {
67        operation_id,
68        operation_kind,
69        state,
70        quote_id,
71        finalization_data,
72        created_at,
73        updated_at,
74    })
75}
76
77#[async_trait]
78impl<RM> SagaTransaction for SQLTransaction<RM>
79where
80    RM: DatabasePool + 'static,
81{
82    type Err = Error;
83
84    async fn get_saga(
85        &mut self,
86        operation_id: &uuid::Uuid,
87    ) -> Result<Option<mint::Saga>, Self::Err> {
88        Ok(query(
89            r#"
90            SELECT
91                operation_id,
92                operation_kind,
93                state,
94                quote_id,
95                finalization_data,
96                created_at,
97                updated_at
98            FROM
99                saga_state
100            WHERE
101                operation_id = :operation_id
102            FOR UPDATE
103            "#,
104        )?
105        .bind("operation_id", operation_id.to_string())
106        .fetch_one(&self.inner)
107        .await?
108        .map(sql_row_to_saga)
109        .transpose()?)
110    }
111
112    async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
113        let current_time = unix_time();
114
115        query(
116            r#"
117            INSERT INTO saga_state
118            (operation_id, operation_kind, state, quote_id, finalization_data, created_at, updated_at)
119            VALUES
120            (:operation_id, :operation_kind, :state, :quote_id, :finalization_data, :created_at, :updated_at)
121            "#,
122        )?
123        .bind("operation_id", saga.operation_id.to_string())
124        .bind("operation_kind", saga.operation_kind.to_string())
125        .bind("state", saga.state.state())
126        .bind("quote_id", saga.quote_id.as_deref())
127        .bind(
128            "finalization_data",
129            saga.finalization_data
130                .as_ref()
131                .map(serde_json::to_string)
132                .transpose()
133                .map_err(|e| Error::Internal(format!("Failed to serialize melt finalization data: {e}")))?,
134        )
135        .bind("created_at", saga.created_at as i64)
136        .bind("updated_at", current_time as i64)
137        .execute(&self.inner)
138        .await?;
139
140        Ok(())
141    }
142
143    async fn update_saga(
144        &mut self,
145        operation_id: &uuid::Uuid,
146        new_state: mint::SagaStateEnum,
147    ) -> Result<(), Self::Err> {
148        let current_time = unix_time();
149
150        query(
151            r#"
152            UPDATE saga_state
153            SET state = :state, updated_at = :updated_at
154            WHERE operation_id = :operation_id
155            "#,
156        )?
157        .bind("state", new_state.state())
158        .bind("updated_at", current_time as i64)
159        .bind("operation_id", operation_id.to_string())
160        .execute(&self.inner)
161        .await?;
162
163        Ok(())
164    }
165
166    async fn update_saga_with_finalization_data(
167        &mut self,
168        operation_id: &uuid::Uuid,
169        new_state: mint::SagaStateEnum,
170        finalization_data: Option<&mint::MeltFinalizationData>,
171    ) -> Result<(), Self::Err> {
172        let current_time = unix_time();
173
174        query(
175            r#"
176            UPDATE saga_state
177            SET state = :state, finalization_data = :finalization_data, updated_at = :updated_at
178            WHERE operation_id = :operation_id
179            "#,
180        )?
181        .bind("state", new_state.state())
182        .bind(
183            "finalization_data",
184            finalization_data
185                .map(serde_json::to_string)
186                .transpose()
187                .map_err(|e| {
188                    Error::Internal(format!("Failed to serialize melt finalization data: {e}"))
189                })?,
190        )
191        .bind("updated_at", current_time as i64)
192        .bind("operation_id", operation_id.to_string())
193        .execute(&self.inner)
194        .await?;
195
196        Ok(())
197    }
198
199    async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
200        query(
201            r#"
202            DELETE FROM saga_state
203            WHERE operation_id = :operation_id
204            "#,
205        )?
206        .bind("operation_id", operation_id.to_string())
207        .execute(&self.inner)
208        .await?;
209
210        Ok(())
211    }
212}
213
214#[async_trait]
215impl<RM> SagaDatabase for SQLMintDatabase<RM>
216where
217    RM: DatabasePool + 'static,
218{
219    type Err = Error;
220
221    async fn get_melt_saga_by_quote_id(
222        &self,
223        quote_id: &cdk_common::QuoteId,
224    ) -> Result<Option<mint::Saga>, Self::Err> {
225        let conn = self
226            .pool
227            .get()
228            .await
229            .map_err(|e| Error::Database(Box::new(e)))?;
230        Ok(query(
231            r#"
232            SELECT
233                operation_id,
234                operation_kind,
235                state,
236                quote_id,
237                finalization_data,
238                created_at,
239                updated_at
240            FROM
241                saga_state
242            WHERE
243                quote_id = :quote_id
244                AND operation_kind = :operation_kind
245            "#,
246        )?
247        .bind("quote_id", quote_id.to_string())
248        .bind("operation_kind", mint::OperationKind::Melt.to_string())
249        .fetch_one(&*conn)
250        .await?
251        .map(sql_row_to_saga)
252        .transpose()?)
253    }
254
255    async fn get_incomplete_sagas(
256        &self,
257        operation_kind: mint::OperationKind,
258    ) -> Result<Vec<mint::Saga>, Self::Err> {
259        let conn = self
260            .pool
261            .get()
262            .await
263            .map_err(|e| Error::Database(Box::new(e)))?;
264        Ok(query(
265            r#"
266            SELECT
267                operation_id,
268                operation_kind,
269                state,
270                quote_id,
271                finalization_data,
272                created_at,
273                updated_at
274            FROM
275                saga_state
276            WHERE
277                operation_kind = :operation_kind
278            ORDER BY created_at ASC
279            "#,
280        )?
281        .bind("operation_kind", operation_kind.to_string())
282        .fetch_all(&*conn)
283        .await?
284        .into_iter()
285        .map(sql_row_to_saga)
286        .collect::<Result<Vec<_>, _>>()?)
287    }
288}