cdk 0.16.0

Core Cashu Development Kit library implementing the Cashu protocol
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
use std::collections::VecDeque;
use std::sync::Arc;

use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{Operation, Saga, SwapSagaState};
use cdk_common::nuts::BlindedMessage;
use cdk_common::{database, Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
use tracing::instrument;

use self::compensation::{CompensatingAction, RemoveSwapSetup};
use self::state::{Initial, SetupComplete, Signed};
use crate::mint::subscription::PubSubManager;
use crate::Mint;

pub mod compensation;
mod state;

#[cfg(test)]
mod tests;

/// Saga pattern implementation for atomic swap operations.
///
/// # Why Use the Saga Pattern?
///
/// The swap operation consists of multiple steps that span database transactions
/// and non-transactional operations (blind signing). We need to ensure atomicity
/// across these heterogeneous steps while maintaining consistency in failure scenarios.
///
/// Traditional ACID transactions cannot span:
/// 1. Multiple database transactions (TX1: setup, TX2: finalize)
/// 2. Non-database operations (blind signing of outputs)
///
/// The saga pattern solves this by:
/// - Breaking the operation into discrete steps with clear state transitions
/// - Recording compensating actions for each forward step
/// - Automatically rolling back via compensations if any step fails
///
/// # Transaction Boundaries
///
/// - **TX1 (setup_swap)**: Atomically verifies balance, adds input proofs (pending),
///   adds output blinded messages, and persists saga state for crash recovery
/// - **Signing (sign_outputs)**: Non-transactional cryptographic operation
/// - **TX2 (finalize)**: Atomically adds signatures to outputs, marks inputs as spent,
///   and deletes saga state (best-effort, will be cleaned up on recovery if this fails)
///
/// Saga state persistence is atomic with swap state changes, ensuring consistency
/// for crash recovery scenarios.
///
/// # Expected Actions
///
/// 1. **setup_swap**: Verifies the swap is balanced, reserves inputs, prepares outputs
///    - Compensation: Removes both inputs and outputs if later steps fail
/// 2. **sign_outputs**: Performs blind signing (no DB changes)
///    - Triggers compensation if signing fails
/// 3. **finalize**: Commits signatures and marks inputs spent
///    - Triggers compensation if finalization fails
///    - Clears compensations on success (swap complete)
///
/// # Failure Handling
///
/// If any step fails after setup_swap, all compensating actions are executed in reverse
/// order to restore the database to its pre-swap state. This ensures no partial swaps
/// leave the system in an inconsistent state.
///
/// # Compensation Order (LIFO)
///
/// Compensations are stored in a VecDeque and executed in LIFO (Last-In-First-Out) order
/// using `push_front` + iteration. This ensures that actions are undone in the reverse
/// order they were performed, which is critical for maintaining data consistency.
///
/// Example: If we perform actions A → B → C in the forward path, compensations must
/// execute as C' → B' → A' to properly reverse the operations without violating
/// any invariants or constraints.
///
/// # Typestate Pattern
///
/// This saga uses the **typestate pattern** to enforce state transitions at compile-time.
/// Each state (Initial, SetupComplete, Signed) is a distinct type, and operations are
/// only available on the appropriate type:
///
/// ```text
/// SwapSaga<Initial>
///   └─> setup_swap() -> SwapSaga<SetupComplete>
///         └─> sign_outputs() -> SwapSaga<Signed>
///               └─> finalize() -> SwapResponse
/// ```
///
/// **Benefits:**
/// - Invalid state transitions (e.g., `finalize()` before `sign_outputs()`) won't compile
/// - State-specific data (e.g., signatures) only exists in the appropriate state type
/// - No runtime state checks or `Option<T>` unwrapping needed
/// - IDE autocomplete only shows valid operations for each state
pub struct SwapSaga<'a, S> {
    mint: &'a super::Mint,
    db: DynMintDatabase,
    pubsub: Arc<PubSubManager>,
    /// Compensating actions in LIFO order (most recent first)
    compensations: VecDeque<Box<dyn CompensatingAction>>,
    /// Operation ID (used for saga tracking, generated upfront)
    operation_id: uuid::Uuid,
    state_data: S,
}

impl<'a> SwapSaga<'a, Initial> {
    pub fn new(mint: &'a super::Mint, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
        let operation_id = uuid::Uuid::new_v4();

        Self {
            mint,
            db,
            pubsub,
            compensations: VecDeque::new(),
            operation_id,
            state_data: Initial { operation_id },
        }
    }

    /// Sets up the swap by atomically verifying balance and reserving inputs/outputs.
    ///
    /// This is the first transaction (TX1) in the saga and must complete before blind signing.
    ///
    /// # What This Does
    ///
    /// Within a single database transaction:
    /// 1. Verifies the swap is balanced (input amount >= output amount + fees)
    /// 2. Adds input proofs to the database
    /// 3. Updates input proof states from Unspent to Pending
    /// 4. Adds output blinded messages to the database
    /// 5. Persists saga state for crash recovery (atomic with steps 1-4)
    /// 6. Publishes proof state changes via pubsub
    ///
    /// # Compensation
    ///
    /// Registers a compensation action that will remove both the input proofs and output
    /// blinded messages if any subsequent step (signing or finalization) fails.
    ///
    /// # Errors
    ///
    /// - `TokenPending`: Proofs are already pending or blinded messages are duplicates
    /// - `TokenAlreadySpent`: Proofs have already been spent
    /// - `DuplicateOutputs`: Output blinded messages already exist
    #[instrument(skip_all)]
    pub async fn setup_swap(
        mut self,
        input_proofs: &Proofs,
        blinded_messages: &[BlindedMessage],
        quote_id: Option<QuoteId>,
        input_verification: crate::mint::Verification,
    ) -> Result<SwapSaga<'a, SetupComplete>, Error> {
        let output_verification = self.mint.verify_outputs(blinded_messages).map_err(|err| {
            tracing::debug!("Output verification failed: {:?}", err);
            err
        })?;

        // Verify balance within the transaction
        self.mint
            .verify_transaction_balanced(
                input_verification.clone(),
                output_verification.clone(),
                input_proofs,
            )
            .await?;

        // Calculate amounts to create Operation
        let total_redeemed = input_verification.amount;
        let total_issued = output_verification.amount;

        let fee_breakdown = self.mint.get_proofs_fee(input_proofs).await?;

        // Create Operation with actual amounts now that we know them
        // Convert typed amounts to untyped for Operation::new
        let operation = Operation::new(
            self.state_data.operation_id,
            cdk_common::mint::OperationKind::Swap,
            total_issued.clone().into(),
            total_redeemed.clone().into(),
            fee_breakdown.total,
            None, // complete_at
            None, // payment_method (not applicable for swap)
        );

        let mut tx = self.db.begin_transaction().await?;

        // Add input proofs to DB
        let mut new_proofs = match tx
            .add_proofs(input_proofs.clone(), quote_id.clone(), &operation)
            .await
        {
            Ok(proofs) => proofs,
            Err(err) => {
                tx.rollback().await?;
                return Err(match err {
                    database::Error::Duplicate => Error::TokenPending,
                    database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
                    _ => Error::Database(err),
                });
            }
        };

        let ys = match input_proofs.ys() {
            Ok(ys) => ys,
            Err(err) => return Err(Error::NUT00(err)),
        };

        if let Err(err) = Mint::update_proofs_state(&mut tx, &mut new_proofs, State::Pending).await
        {
            tx.rollback().await?;
            return Err(err);
        }

        // Add output blinded messages
        if let Err(err) = tx
            .add_blinded_messages(quote_id.as_ref(), blinded_messages, &operation)
            .await
        {
            tx.rollback().await?;
            return Err(match err {
                database::Error::Duplicate => Error::DuplicateOutputs,
                _ => Error::Database(err),
            });
        }

        // Store data in saga struct (avoid duplication in state enum)
        let blinded_messages_vec = blinded_messages.to_vec();
        let blinded_secrets: Vec<PublicKey> = blinded_messages_vec
            .iter()
            .map(|bm| bm.blinded_secret)
            .collect();

        // Persist saga state for crash recovery (atomic with TX1)
        let saga = Saga::new_swap(self.operation_id, SwapSagaState::SetupComplete);

        if let Err(err) = tx.add_saga(&saga).await {
            tx.rollback().await?;
            return Err(err.into());
        }

        tx.commit().await?;
        // Publish proof state changes
        for pk in &ys {
            self.pubsub.proof_state((*pk, State::Pending));
        }
        // Register compensation (uses LIFO via push_front)
        self.compensations.push_front(Box::new(RemoveSwapSetup {
            blinded_secrets: blinded_secrets.clone(),
            input_ys: ys.clone(),
            operation_id: self.operation_id,
        }));

        // Transition to SetupComplete state
        Ok(SwapSaga {
            mint: self.mint,
            db: self.db,
            pubsub: self.pubsub,
            compensations: self.compensations,
            operation_id: self.operation_id,
            state_data: SetupComplete {
                blinded_messages: blinded_messages_vec,
                ys,
                operation,
                fee_breakdown,
            },
        })
    }
}

impl<'a> SwapSaga<'a, SetupComplete> {
    /// Performs blind signing of output blinded messages.
    ///
    /// This is a non-transactional cryptographic operation that happens after `setup_swap`
    /// and before `finalize`. No database changes occur in this step.
    ///
    /// # What This Does
    ///
    /// 1. Retrieves blinded messages from the state data
    /// 2. Calls the mint's blind signing function to generate signatures
    /// 3. Stores signatures and transitions to the Signed state
    ///
    /// # Failure Handling
    ///
    /// If blind signing fails, all registered compensations are executed to roll back
    /// the setup transaction, removing both input proofs and output blinded messages.
    ///
    /// # Errors
    ///
    /// - Propagates any errors from the blind signing operation
    #[instrument(skip_all)]
    pub async fn sign_outputs(self) -> Result<SwapSaga<'a, Signed>, Error> {
        match self
            .mint
            .blind_sign(self.state_data.blinded_messages.clone())
            .await
        {
            Ok(signatures) => {
                // Transition to Signed state
                // Note: We don't update saga state here because the "signed" state
                // is not used by recovery logic - saga state remains "SetupComplete"
                // until the swap is finalized or compensated
                Ok(SwapSaga {
                    mint: self.mint,
                    db: self.db,
                    pubsub: self.pubsub,
                    compensations: self.compensations,
                    operation_id: self.operation_id,
                    state_data: Signed {
                        blinded_messages: self.state_data.blinded_messages,
                        ys: self.state_data.ys,
                        signatures,
                        operation: self.state_data.operation,
                        fee_breakdown: self.state_data.fee_breakdown,
                    },
                })
            }
            Err(err) => {
                self.compensate_all().await?;
                Err(err)
            }
        }
    }
}

impl SwapSaga<'_, Signed> {
    /// Finalizes the swap by committing signatures and marking inputs as spent.
    ///
    /// This is the second and final transaction (TX2) in the saga and completes the swap.
    ///
    /// # What This Does
    ///
    /// Within a single database transaction:
    /// 1. Adds the blind signatures to the output blinded messages
    /// 2. Updates input proof states from Pending to Spent
    /// 3. Deletes saga state (best-effort, won't fail swap if this fails)
    /// 4. Publishes proof state changes via pubsub
    /// 5. Clears all registered compensations (swap successfully completed)
    ///
    /// # Failure Handling
    ///
    /// If finalization fails, all registered compensations are executed to roll back
    /// the setup transaction, removing both input proofs and output blinded messages.
    /// The signatures are not persisted, so they are lost.
    ///
    /// # Success
    ///
    /// On success, compensations are cleared and the swap is complete. The client
    /// can now use the returned signatures to construct valid proofs. If saga state
    /// deletion fails, a warning is logged but the swap still succeeds (orphaned
    /// saga state will be cleaned up on next recovery).
    ///
    /// # Errors
    ///
    /// - `TokenAlreadySpent`: Input proofs were already spent by another operation
    /// - Propagates any database errors
    #[instrument(skip_all)]
    pub async fn finalize(mut self) -> Result<cdk_common::nuts::SwapResponse, Error> {
        let blinded_secrets: Vec<PublicKey> = self
            .state_data
            .blinded_messages
            .iter()
            .map(|bm| bm.blinded_secret)
            .collect();

        let mut tx = self.db.begin_transaction().await?;

        // Add blind signatures to outputs
        // TODO: WE should move the should fail to the db so the there is not this extra rollback.
        // This would allow the error to be from the same place in test and prod
        #[cfg(test)]
        {
            if crate::test_helpers::mint::should_fail_for("ADD_SIGNATURES") {
                tx.rollback().await?;
                self.compensate_all().await?;
                return Err(Error::Database(database::Error::Database(
                    "Test failure: ADD_SIGNATURES".into(),
                )));
            }
        }

        if let Err(err) = tx
            .add_blind_signatures(&blinded_secrets, &self.state_data.signatures, None)
            .await
        {
            tx.rollback().await?;
            self.compensate_all().await?;
            return Err(err.into());
        }

        // Mark input proofs as spent
        // TODO: WE should move the should fail to the db so the there is not this extra rollback.
        // This would allow the error to be from the same place in test and prod
        #[cfg(test)]
        {
            if crate::test_helpers::mint::should_fail_for("UPDATE_PROOFS") {
                tx.rollback().await?;
                self.compensate_all().await?;
                return Err(Error::Database(database::Error::Database(
                    "Test failure: UPDATE_PROOFS".into(),
                )));
            }
        }

        let mut proofs = match tx.get_proofs(&self.state_data.ys).await {
            Ok(proofs) => proofs,
            Err(err) => {
                tx.rollback().await?;
                self.compensate_all().await?;
                return Err(err.into());
            }
        };

        if let Err(err) = Mint::update_proofs_state(&mut tx, &mut proofs, State::Spent).await {
            tx.rollback().await?;
            self.compensate_all().await?;
            return Err(err);
        }

        if let Err(err) = tx
            .add_completed_operation(
                &self.state_data.operation,
                &self.state_data.fee_breakdown.per_keyset,
            )
            .await
        {
            tx.rollback().await?;
            self.compensate_all().await?;
            return Err(err.into());
        }

        // Delete saga - swap completed successfully (best-effort, atomic with TX2)
        // Don't fail the swap if saga deletion fails - orphaned saga will be
        // cleaned up on next recovery
        if let Err(e) = tx.delete_saga(&self.operation_id).await {
            tracing::warn!(
                "Failed to delete saga in finalize (will be cleaned up on recovery): {}",
                e
            );
            // Don't rollback - swap succeeded, orphaned saga is harmless
        }

        tx.commit().await?;
        // Publish proof state changes
        for pk in &self.state_data.ys {
            self.pubsub.proof_state((*pk, State::Spent));
        }
        // Clear compensations - swap is complete
        self.compensations.clear();

        Ok(cdk_common::nuts::SwapResponse::new(
            self.state_data.signatures,
        ))
    }
}

impl<S> SwapSaga<'_, S> {
    /// Execute all compensating actions and consume the saga.
    ///
    /// This method takes ownership of self to ensure the saga cannot be used
    /// after compensation has been triggered.
    #[instrument(skip_all)]
    async fn compensate_all(self) -> Result<(), Error> {
        let mut compensations = self.compensations;

        if compensations.is_empty() {
            return Ok(());
        }

        #[cfg(feature = "prometheus")]
        {
            use cdk_prometheus::METRICS;

            self.mint.record_swap_failure("process_swap_request");
            METRICS.dec_in_flight_requests("process_swap_request");
        }

        tracing::warn!("Running {} compensating actions", compensations.len());

        while let Some(compensation) = compensations.pop_front() {
            tracing::debug!("Running compensation: {}", compensation.name());
            if let Err(e) = compensation.execute(&self.db, &self.pubsub).await {
                tracing::error!(
                    "Compensation {} failed: {}. Continuing...",
                    compensation.name(),
                    e
                );
            }
        }

        Ok(())
    }
}