cdk_sql_common/mint/
saga.rs1use std::str::FromStr;
4
5use async_trait::async_trait;
6use cdk_common::database::mint::{SagaDatabase, SagaTransaction};
7use cdk_common::database::Error;
8use cdk_common::mint;
9use cdk_common::util::unix_time;
10
11use super::{SQLMintDatabase, SQLTransaction};
12use crate::pool::DatabasePool;
13use crate::stmt::{query, Column};
14use crate::{column_as_number, column_as_string, unpack_into};
15
16fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
17 unpack_into!(
18 let (
19 operation_id,
20 operation_kind,
21 state,
22 quote_id,
23 created_at,
24 updated_at
25 ) = row
26 );
27
28 let operation_id_str = column_as_string!(&operation_id);
29 let operation_id = uuid::Uuid::parse_str(&operation_id_str)
30 .map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {e}")))?;
31
32 let operation_kind_str = column_as_string!(&operation_kind);
33 let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
34 .map_err(|e| Error::Internal(format!("Invalid operation kind: {e}")))?;
35
36 let state_str = column_as_string!(&state);
37 let state = mint::SagaStateEnum::new(operation_kind, &state_str)
38 .map_err(|e| Error::Internal(format!("Invalid saga state: {e}")))?;
39
40 let quote_id = match "e_id {
41 Column::Text(s) => {
42 if s.is_empty() {
43 None
44 } else {
45 Some(s.clone())
46 }
47 }
48 Column::Null => None,
49 _ => None,
50 };
51
52 let created_at: u64 = column_as_number!(created_at);
53 let updated_at: u64 = column_as_number!(updated_at);
54
55 Ok(mint::Saga {
56 operation_id,
57 operation_kind,
58 state,
59 quote_id,
60 created_at,
61 updated_at,
62 })
63}
64
65#[async_trait]
66impl<RM> SagaTransaction for SQLTransaction<RM>
67where
68 RM: DatabasePool + 'static,
69{
70 type Err = Error;
71
72 async fn get_saga(
73 &mut self,
74 operation_id: &uuid::Uuid,
75 ) -> Result<Option<mint::Saga>, Self::Err> {
76 Ok(query(
77 r#"
78 SELECT
79 operation_id,
80 operation_kind,
81 state,
82 quote_id,
83 created_at,
84 updated_at
85 FROM
86 saga_state
87 WHERE
88 operation_id = :operation_id
89 FOR UPDATE
90 "#,
91 )?
92 .bind("operation_id", operation_id.to_string())
93 .fetch_one(&self.inner)
94 .await?
95 .map(sql_row_to_saga)
96 .transpose()?)
97 }
98
99 async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
100 let current_time = unix_time();
101
102 query(
103 r#"
104 INSERT INTO saga_state
105 (operation_id, operation_kind, state, quote_id, created_at, updated_at)
106 VALUES
107 (:operation_id, :operation_kind, :state, :quote_id, :created_at, :updated_at)
108 "#,
109 )?
110 .bind("operation_id", saga.operation_id.to_string())
111 .bind("operation_kind", saga.operation_kind.to_string())
112 .bind("state", saga.state.state())
113 .bind("quote_id", saga.quote_id.as_deref())
114 .bind("created_at", saga.created_at as i64)
115 .bind("updated_at", current_time as i64)
116 .execute(&self.inner)
117 .await?;
118
119 Ok(())
120 }
121
122 async fn update_saga(
123 &mut self,
124 operation_id: &uuid::Uuid,
125 new_state: mint::SagaStateEnum,
126 ) -> Result<(), Self::Err> {
127 let current_time = unix_time();
128
129 query(
130 r#"
131 UPDATE saga_state
132 SET state = :state, updated_at = :updated_at
133 WHERE operation_id = :operation_id
134 "#,
135 )?
136 .bind("state", new_state.state())
137 .bind("updated_at", current_time as i64)
138 .bind("operation_id", operation_id.to_string())
139 .execute(&self.inner)
140 .await?;
141
142 Ok(())
143 }
144
145 async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
146 query(
147 r#"
148 DELETE FROM saga_state
149 WHERE operation_id = :operation_id
150 "#,
151 )?
152 .bind("operation_id", operation_id.to_string())
153 .execute(&self.inner)
154 .await?;
155
156 Ok(())
157 }
158}
159
160#[async_trait]
161impl<RM> SagaDatabase for SQLMintDatabase<RM>
162where
163 RM: DatabasePool + 'static,
164{
165 type Err = Error;
166
167 async fn get_incomplete_sagas(
168 &self,
169 operation_kind: mint::OperationKind,
170 ) -> Result<Vec<mint::Saga>, Self::Err> {
171 let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
172 Ok(query(
173 r#"
174 SELECT
175 operation_id,
176 operation_kind,
177 state,
178 quote_id,
179 created_at,
180 updated_at
181 FROM
182 saga_state
183 WHERE
184 operation_kind = :operation_kind
185 ORDER BY created_at ASC
186 "#,
187 )?
188 .bind("operation_kind", operation_kind.to_string())
189 .fetch_all(&*conn)
190 .await?
191 .into_iter()
192 .map(sql_row_to_saga)
193 .collect::<Result<Vec<_>, _>>()?)
194 }
195}