cdk-sql-common 0.16.0

Generic SQL storage backend for CDK
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! Signatures database implementation

use std::collections::HashMap;
use std::str::FromStr;

use async_trait::async_trait;
use cdk_common::database::{self, Error, MintSignatureTransaction, MintSignaturesDatabase};
use cdk_common::quote_id::QuoteId;
use cdk_common::util::unix_time;
use cdk_common::{Amount, BlindSignature, BlindSignatureDleq, Id, PublicKey, SecretKey};

use super::proofs::sql_row_to_hashmap_amount;
use super::{SQLMintDatabase, SQLTransaction};
use crate::pool::DatabasePool;
use crate::stmt::{query, Column};
use crate::{column_as_nullable_string, column_as_number, column_as_string, unpack_into};

pub(crate) fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error> {
    unpack_into!(
        let (
            keyset_id, amount, c, dleq_e, dleq_s
        ) = row
    );

    let dleq = match (
        column_as_nullable_string!(dleq_e),
        column_as_nullable_string!(dleq_s),
    ) {
        (Some(e), Some(s)) => Some(BlindSignatureDleq {
            e: SecretKey::from_hex(e)?,
            s: SecretKey::from_hex(s)?,
        }),
        _ => None,
    };

    let amount: u64 = column_as_number!(amount);

    Ok(BlindSignature {
        amount: Amount::from(amount),
        keyset_id: column_as_string!(keyset_id, Id::from_str, Id::from_bytes),
        c: column_as_string!(c, PublicKey::from_hex, PublicKey::from_slice),
        dleq,
    })
}

#[async_trait]
impl<RM> MintSignatureTransaction for SQLTransaction<RM>
where
    RM: DatabasePool + 'static,
{
    type Err = Error;

    async fn add_blind_signatures(
        &mut self,
        blinded_messages: &[PublicKey],
        blind_signatures: &[BlindSignature],
        quote_id: Option<QuoteId>,
    ) -> Result<(), Self::Err> {
        let current_time = unix_time();

        if blinded_messages.len() != blind_signatures.len() {
            return Err(database::Error::Internal(
                "Mismatched array lengths for blinded messages and blind signatures".to_string(),
            ));
        }

        // Select all existing rows for the given blinded messages at once
        let mut existing_rows = query(
            r#"
            SELECT blinded_message, c, dleq_e, dleq_s
            FROM blind_signature
            WHERE blinded_message IN (:blinded_messages)
            FOR UPDATE
            "#,
        )?
        .bind_vec(
            "blinded_messages",
            blinded_messages
                .iter()
                .map(|message| message.to_bytes().to_vec())
                .collect(),
        )
        .fetch_all(&self.inner)
        .await?
        .into_iter()
        .map(|mut row| {
            Ok((
                column_as_string!(&row.remove(0), PublicKey::from_hex, PublicKey::from_slice),
                (row[0].clone(), row[1].clone(), row[2].clone()),
            ))
        })
        .collect::<Result<HashMap<_, _>, Error>>()?;

        // Iterate over the provided blinded messages and signatures
        for (message, signature) in blinded_messages.iter().zip(blind_signatures) {
            match existing_rows.remove(message) {
                None => {
                    // Unknown blind message: Insert new row with all columns
                    query(
                        r#"
                        INSERT INTO blind_signature
                        (blinded_message, amount, keyset_id, c, quote_id, dleq_e, dleq_s, created_time, signed_time)
                        VALUES
                        (:blinded_message, :amount, :keyset_id, :c, :quote_id, :dleq_e, :dleq_s, :created_time, :signed_time)
                        "#,
                    )?
                    .bind("blinded_message", message.to_bytes().to_vec())
                    .bind("amount", u64::from(signature.amount) as i64)
                    .bind("keyset_id", signature.keyset_id.to_string())
                    .bind("c", signature.c.to_bytes().to_vec())
                    .bind("quote_id", quote_id.as_ref().map(|q| q.to_string()))
                    .bind(
                        "dleq_e",
                        signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
                    )
                    .bind(
                        "dleq_s",
                        signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
                    )
                    .bind("created_time", current_time as i64)
                    .bind("signed_time", current_time as i64)
                    .execute(&self.inner)
                    .await?;

                    query(
                        r#"
                        INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
                        VALUES (:keyset_id, :amount, 0)
                        ON CONFLICT (keyset_id)
                        DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
                        "#,
                    )?
                    .bind("amount", u64::from(signature.amount) as i64)
                    .bind("keyset_id", signature.keyset_id.to_string())
                    .execute(&self.inner)
                    .await?;
                }
                Some((c, _dleq_e, _dleq_s)) => {
                    // Blind message exists: check if c is NULL
                    match c {
                        Column::Null => {
                            // Blind message with no c: Update with missing columns c, dleq_e, dleq_s
                            query(
                                r#"
                                UPDATE blind_signature
                                SET c = :c, dleq_e = :dleq_e, dleq_s = :dleq_s, signed_time = :signed_time, amount = :amount
                                WHERE blinded_message = :blinded_message
                                "#,
                            )?
                            .bind("c", signature.c.to_bytes().to_vec())
                            .bind(
                                "dleq_e",
                                signature.dleq.as_ref().map(|dleq| dleq.e.to_secret_hex()),
                            )
                            .bind(
                                "dleq_s",
                                signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()),
                            )
                            .bind("blinded_message", message.to_bytes().to_vec())
                            .bind("signed_time", current_time as i64)
                            .bind("amount", u64::from(signature.amount) as i64)
                            .execute(&self.inner)
                            .await?;

                            query(
                                r#"
                                INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed)
                                VALUES (:keyset_id, :amount, 0)
                                ON CONFLICT (keyset_id)
                                DO UPDATE SET total_issued = keyset_amounts.total_issued + EXCLUDED.total_issued
                                "#,
                            )?
                            .bind("amount", u64::from(signature.amount) as i64)
                            .bind("keyset_id", signature.keyset_id.to_string())
                            .execute(&self.inner)
                            .await?;
                        }
                        _ => {
                            // Blind message already has c: Error
                            tracing::error!(
                                "Attempting to add signature to message already signed {}",
                                message
                            );

                            return Err(database::Error::Duplicate);
                        }
                    }
                }
            }
        }

        debug_assert!(
            existing_rows.is_empty(),
            "Unexpected existing rows remain: {:?}",
            existing_rows.keys().collect::<Vec<_>>()
        );

        if !existing_rows.is_empty() {
            tracing::error!("Did not check all existing rows");
            return Err(Error::Internal(
                "Did not check all existing rows".to_string(),
            ));
        }

        Ok(())
    }

    async fn get_blind_signatures(
        &mut self,
        blinded_messages: &[PublicKey],
    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
        let mut blinded_signatures = query(
            r#"SELECT
                keyset_id,
                amount,
                c,
                dleq_e,
                dleq_s,
                blinded_message
            FROM
                blind_signature
            WHERE blinded_message IN (:b) AND c IS NOT NULL
            "#,
        )?
        .bind_vec(
            "b",
            blinded_messages
                .iter()
                .map(|b| b.to_bytes().to_vec())
                .collect(),
        )
        .fetch_all(&self.inner)
        .await?
        .into_iter()
        .map(|mut row| {
            Ok((
                column_as_string!(
                    &row.pop().ok_or(Error::InvalidDbResponse)?,
                    PublicKey::from_hex,
                    PublicKey::from_slice
                ),
                sql_row_to_blind_signature(row)?,
            ))
        })
        .collect::<Result<HashMap<_, _>, Error>>()?;
        Ok(blinded_messages
            .iter()
            .map(|y| blinded_signatures.remove(y))
            .collect())
    }
}

#[async_trait]
impl<RM> MintSignaturesDatabase for SQLMintDatabase<RM>
where
    RM: DatabasePool + 'static,
{
    type Err = Error;

    async fn get_blind_signatures(
        &self,
        blinded_messages: &[PublicKey],
    ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
        let mut blinded_signatures = query(
            r#"SELECT
                keyset_id,
                amount,
                c,
                dleq_e,
                dleq_s,
                blinded_message
            FROM
                blind_signature
            WHERE blinded_message IN (:b) AND c IS NOT NULL
            "#,
        )?
        .bind_vec(
            "b",
            blinded_messages
                .iter()
                .map(|b_| b_.to_bytes().to_vec())
                .collect(),
        )
        .fetch_all(&*conn)
        .await?
        .into_iter()
        .map(|mut row| {
            Ok((
                column_as_string!(
                    &row.pop().ok_or(Error::InvalidDbResponse)?,
                    PublicKey::from_hex,
                    PublicKey::from_slice
                ),
                sql_row_to_blind_signature(row)?,
            ))
        })
        .collect::<Result<HashMap<_, _>, Error>>()?;
        Ok(blinded_messages
            .iter()
            .map(|y| blinded_signatures.remove(y))
            .collect())
    }

    async fn get_blind_signatures_for_keyset(
        &self,
        keyset_id: &Id,
    ) -> Result<Vec<BlindSignature>, Self::Err> {
        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
        Ok(query(
            r#"
            SELECT
                keyset_id,
                amount,
                c,
                dleq_e,
                dleq_s
            FROM
                blind_signature
            WHERE
                keyset_id=:keyset_id AND c IS NOT NULL
            "#,
        )?
        .bind("keyset_id", keyset_id.to_string())
        .fetch_all(&*conn)
        .await?
        .into_iter()
        .map(sql_row_to_blind_signature)
        .collect::<Result<Vec<BlindSignature>, _>>()?)
    }

    /// Get [`BlindSignature`]s for quote
    async fn get_blind_signatures_for_quote(
        &self,
        quote_id: &QuoteId,
    ) -> Result<Vec<BlindSignature>, Self::Err> {
        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
        Ok(query(
            r#"
            SELECT
                keyset_id,
                amount,
                c,
                dleq_e,
                dleq_s
            FROM
                blind_signature
            WHERE
                quote_id=:quote_id AND c IS NOT NULL
            "#,
        )?
        .bind("quote_id", quote_id.to_string())
        .fetch_all(&*conn)
        .await?
        .into_iter()
        .map(sql_row_to_blind_signature)
        .collect::<Result<Vec<BlindSignature>, _>>()?)
    }

    /// Get total proofs redeemed by keyset id
    async fn get_total_issued(&self) -> Result<HashMap<Id, Amount>, Self::Err> {
        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
        query(
            r#"
            SELECT
                keyset_id,
                total_issued as amount
            FROM
                keyset_amounts
        "#,
        )?
        .fetch_all(&*conn)
        .await?
        .into_iter()
        .map(sql_row_to_hashmap_amount)
        .collect()
    }

    async fn get_blinded_secrets_by_operation_id(
        &self,
        operation_id: &uuid::Uuid,
    ) -> Result<Vec<PublicKey>, Self::Err> {
        let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
        query(
            r#"
            SELECT
                blinded_message
            FROM
                blind_signature
            WHERE
                operation_id = :operation_id
            "#,
        )?
        .bind("operation_id", operation_id.to_string())
        .fetch_all(&*conn)
        .await?
        .into_iter()
        .map(|row| -> Result<PublicKey, Error> {
            Ok(column_as_string!(
                &row[0],
                PublicKey::from_hex,
                PublicKey::from_slice
            ))
        })
        .collect::<Result<Vec<_>, _>>()
    }
}