Skip to main content

cdk_sql_common/mint/
saga.rs

1//! Saga database implementation
2
3use std::str::FromStr;
4
5use async_trait::async_trait;
6use cdk_common::database::mint::{SagaDatabase, SagaTransaction};
7use cdk_common::database::Error;
8use cdk_common::mint;
9use cdk_common::util::unix_time;
10
11use super::{SQLMintDatabase, SQLTransaction};
12use crate::pool::DatabasePool;
13use crate::stmt::{query, Column};
14use crate::{column_as_number, column_as_string, unpack_into};
15
16fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
17    unpack_into!(
18        let (
19            operation_id,
20            operation_kind,
21            state,
22            quote_id,
23            created_at,
24            updated_at
25        ) = row
26    );
27
28    let operation_id_str = column_as_string!(&operation_id);
29    let operation_id = uuid::Uuid::parse_str(&operation_id_str)
30        .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
31
32    let operation_kind_str = column_as_string!(&operation_kind);
33    let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
34        .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
35
36    let state_str = column_as_string!(&state);
37    let state = mint::SagaStateEnum::new(operation_kind, &state_str)
38        .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
39
40    let quote_id = match &quote_id {
41        Column::Text(s) => {
42            if s.is_empty() {
43                None
44            } else {
45                Some(s.clone())
46            }
47        }
48        Column::Null => None,
49        _ => None,
50    };
51
52    let created_at: u64 = column_as_number!(created_at);
53    let updated_at: u64 = column_as_number!(updated_at);
54
55    Ok(mint::Saga {
56        operation_id,
57        operation_kind,
58        state,
59        quote_id,
60        created_at,
61        updated_at,
62    })
63}
64
65#[async_trait]
66impl<RM> SagaTransaction for SQLTransaction<RM>
67where
68    RM: DatabasePool + 'static,
69{
70    type Err = Error;
71
72    async fn get_saga(
73        &mut self,
74        operation_id: &uuid::Uuid,
75    ) -> Result<Option<mint::Saga>, Self::Err> {
76        Ok(query(
77            r#"
78            SELECT
79                operation_id,
80                operation_kind,
81                state,
82                quote_id,
83                created_at,
84                updated_at
85            FROM
86                saga_state
87            WHERE
88                operation_id = :operation_id
89            FOR UPDATE
90            "#,
91        )?
92        .bind("operation_id", operation_id.to_string())
93        .fetch_one(&self.inner)
94        .await?
95        .map(sql_row_to_saga)
96        .transpose()?)
97    }
98
99    async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
100        let current_time = unix_time();
101
102        query(
103            r#"
104            INSERT INTO saga_state
105            (operation_id, operation_kind, state, quote_id, created_at, updated_at)
106            VALUES
107            (:operation_id, :operation_kind, :state, :quote_id, :created_at, :updated_at)
108            "#,
109        )?
110        .bind("operation_id", saga.operation_id.to_string())
111        .bind("operation_kind", saga.operation_kind.to_string())
112        .bind("state", saga.state.state())
113        .bind("quote_id", saga.quote_id.as_deref())
114        .bind("created_at", saga.created_at as i64)
115        .bind("updated_at", current_time as i64)
116        .execute(&self.inner)
117        .await?;
118
119        Ok(())
120    }
121
122    async fn update_saga(
123        &mut self,
124        operation_id: &uuid::Uuid,
125        new_state: mint::SagaStateEnum,
126    ) -> Result<(), Self::Err> {
127        let current_time = unix_time();
128
129        query(
130            r#"
131            UPDATE saga_state
132            SET state = :state, updated_at = :updated_at
133            WHERE operation_id = :operation_id
134            "#,
135        )?
136        .bind("state", new_state.state())
137        .bind("updated_at", current_time as i64)
138        .bind("operation_id", operation_id.to_string())
139        .execute(&self.inner)
140        .await?;
141
142        Ok(())
143    }
144
145    async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
146        query(
147            r#"
148            DELETE FROM saga_state
149            WHERE operation_id = :operation_id
150            "#,
151        )?
152        .bind("operation_id", operation_id.to_string())
153        .execute(&self.inner)
154        .await?;
155
156        Ok(())
157    }
158}
159
160#[async_trait]
161impl<RM> SagaDatabase for SQLMintDatabase<RM>
162where
163    RM: DatabasePool + 'static,
164{
165    type Err = Error;
166
167    async fn get_incomplete_sagas(
168        &self,
169        operation_kind: mint::OperationKind,
170    ) -> Result<Vec<mint::Saga>, Self::Err> {
171        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
172        Ok(query(
173            r#"
174            SELECT
175                operation_id,
176                operation_kind,
177                state,
178                quote_id,
179                created_at,
180                updated_at
181            FROM
182                saga_state
183            WHERE
184                operation_kind = :operation_kind
185            ORDER BY created_at ASC
186            "#,
187        )?
188        .bind("operation_kind", operation_kind.to_string())
189        .fetch_all(&*conn)
190        .await?
191        .into_iter()
192        .map(sql_row_to_saga)
193        .collect::<Result<Vec<_>, _>>()?)
194    }
195}