Skip to main content

cdk_bdk/storage/
mod.rs

1//! BDK storage operations using KV store
2
3use std::sync::Arc;
4
5use cdk_common::database::KVStore;
6use serde::de::DeserializeOwned;
7use serde::Serialize;
8
9use crate::error::Error;
10use crate::receive::receive_intent::record::ReceiveIntentRecord;
11use crate::send::batch_transaction::record::SendBatchRecord;
12use crate::send::payment_intent::record::SendIntentRecord;
13
14pub mod receive;
15pub mod send;
16mod types;
17
18pub use types::{FailedSendAttemptRecord, FinalizedReceiveIntentRecord, FinalizedSendIntentRecord};
19
20/// Primary namespace for BDK KV store operations
21pub const BDK_NAMESPACE: &str = "bdk";
22
23/// Secondary namespace for send intents
24pub const SEND_INTENT_NAMESPACE: &str = "send_intent";
25
26/// Secondary namespace for send intent quote id index
27pub const SEND_INTENT_QUOTE_ID_NAMESPACE: &str = "send_intent_quote_id";
28
29/// Secondary namespace for send batches
30pub const SEND_BATCH_NAMESPACE: &str = "send_batch";
31
32/// Secondary namespace for failed pre-sign send attempt tombstones.
33pub const FAILED_SEND_ATTEMPT_NAMESPACE: &str = "failed_send_attempt";
34
35/// Secondary namespace for finalized (confirmed) intents.
36/// Stores tombstone records so `check_outgoing_payment` can return
37/// correct `total_spent` after the active intent has been deleted.
38pub const FINALIZED_INTENT_NAMESPACE: &str = "finalized_intent";
39
40/// Secondary namespace for tracked receive address index (address -> quote_id)
41pub const RECEIVE_ADDRESS_QUOTE_ID_NAMESPACE: &str = "receive_address_quote_id";
42
43/// Secondary namespace for receive intents (keyed by intent_id)
44pub const RECEIVE_INTENT_NAMESPACE: &str = "receive_intent";
45
46/// Secondary namespace for receive intent outpoint index (outpoint -> intent_id)
47pub const RECEIVE_INTENT_OUTPOINT_NAMESPACE: &str = "receive_intent_outpoint";
48
49/// Secondary namespace for finalized (confirmed) receive intents.
50/// Stores tombstone records so `check_incoming_payment_status` can
51/// return historical data after the active intent has been deleted.
52pub const FINALIZED_RECEIVE_INTENT_NAMESPACE: &str = "finalized_receive_intent";
53
54/// Secondary namespace for finalized receive intent outpoint index (outpoint -> intent_id)
55pub const FINALIZED_RECEIVE_INTENT_OUTPOINT_NAMESPACE: &str = "finalized_receive_intent_outpoint";
56
57/// Secondary-namespace prefix for the finalized receive-intent quote-id index.
58///
59/// Full namespace: `finalized_receive_intent_by_quote__<quote_id>`, with
60/// one key per finalized intent (`<intent_id>` → empty value). Storing
61/// each intent under its own key lets `finalize_receive_intent` commit a
62/// single idempotent `kv_write` instead of an RMW on a serialized list,
63/// which would otherwise race under Postgres `READ COMMITTED`.
64pub const FINALIZED_RECEIVE_INTENT_BY_QUOTE_NAMESPACE_PREFIX: &str =
65    "finalized_receive_intent_by_quote";
66
67/// Build the per-quote secondary namespace used to index finalized receive intents.
68pub fn finalized_receive_intent_by_quote_namespace(quote_id: &str) -> String {
69    format!("{FINALIZED_RECEIVE_INTENT_BY_QUOTE_NAMESPACE_PREFIX}__{quote_id}")
70}
71
72/// Secondary namespace for finalized send intent quote id index (quote_id -> intent_id)
73pub const FINALIZED_SEND_INTENT_QUOTE_ID_NAMESPACE: &str = "finalized_send_intent_quote_id";
74
75/// Encode an outpoint string for use as a KV store key.
76///
77/// The KV store only allows ASCII letters, numbers, underscore, and
78/// hyphen. Outpoint strings contain `:` (e.g. `txid:vout`), so we
79/// replace it with `-`.
80fn outpoint_to_key(outpoint: &str) -> String {
81    outpoint.replace(':', "-")
82}
83
84pub trait KvRecord: Serialize + DeserializeOwned + Sized {
85    const NAMESPACE: &'static str;
86
87    fn key(&self) -> String;
88}
89
90pub(crate) trait ReplaceState<S>: KvRecord {
91    fn replace_state(&mut self, state: S);
92}
93
94/// BDK KV store operations
95#[derive(Clone)]
96pub struct BdkStorage {
97    pub(crate) kv_store: Arc<dyn KVStore<Err = cdk_common::database::Error> + Send + Sync>,
98}
99
100impl BdkStorage {
101    /// Create a new BdkStorage instance
102    pub fn new(
103        kv_store: Arc<dyn KVStore<Err = cdk_common::database::Error> + Send + Sync>,
104    ) -> Self {
105        Self { kv_store }
106    }
107
108    async fn put_record<T>(&self, record: &T) -> Result<(), Error>
109    where
110        T: KvRecord,
111    {
112        let serialized = serde_json::to_vec(record)?;
113        let mut tx = self
114            .kv_store
115            .begin_transaction()
116            .await
117            .map_err(Error::from)?;
118        tx.kv_write(BDK_NAMESPACE, T::NAMESPACE, &record.key(), &serialized)
119            .await
120            .map_err(Error::from)?;
121        tx.commit().await.map_err(Error::from)?;
122        Ok(())
123    }
124
125    async fn get_record<T>(&self, key: &str) -> Result<Option<T>, Error>
126    where
127        T: KvRecord,
128    {
129        let data = self
130            .kv_store
131            .kv_read(BDK_NAMESPACE, T::NAMESPACE, key)
132            .await
133            .map_err(Error::from)?;
134
135        match data {
136            Some(bytes) => Ok(Some(serde_json::from_slice(&bytes)?)),
137            None => Ok(None),
138        }
139    }
140
141    async fn list_records<T>(&self) -> Result<Vec<T>, Error>
142    where
143        T: KvRecord,
144    {
145        let keys = self
146            .kv_store
147            .kv_list(BDK_NAMESPACE, T::NAMESPACE)
148            .await
149            .map_err(Error::from)?;
150
151        let mut records = Vec::new();
152        for key in keys {
153            if let Some(data) = self
154                .kv_store
155                .kv_read(BDK_NAMESPACE, T::NAMESPACE, &key)
156                .await
157                .map_err(Error::from)?
158            {
159                match serde_json::from_slice::<T>(&data) {
160                    Ok(record) => records.push(record),
161                    Err(e) => {
162                        tracing::warn!("Failed to deserialize {} {}: {}", T::NAMESPACE, key, e);
163                    }
164                }
165            }
166        }
167
168        Ok(records)
169    }
170
171    async fn delete_record<T>(&self, key: &str) -> Result<(), Error>
172    where
173        T: KvRecord,
174    {
175        let mut tx = self
176            .kv_store
177            .begin_transaction()
178            .await
179            .map_err(Error::from)?;
180        tx.kv_remove(BDK_NAMESPACE, T::NAMESPACE, key)
181            .await
182            .map_err(Error::from)?;
183        tx.commit().await.map_err(Error::from)?;
184        Ok(())
185    }
186
187    async fn update_record_state<T, S>(&self, key: &str, new_state: &S) -> Result<(), Error>
188    where
189        T: ReplaceState<S>,
190        S: Clone,
191    {
192        let data = self
193            .kv_store
194            .kv_read(BDK_NAMESPACE, T::NAMESPACE, key)
195            .await
196            .map_err(Error::from)?;
197
198        let Some(bytes) = data else {
199            return Err(Error::Wallet(format!(
200                "Record not found in namespace {} for key {}",
201                T::NAMESPACE,
202                key
203            )));
204        };
205
206        let mut record: T = serde_json::from_slice(&bytes)?;
207        record.replace_state(new_state.clone());
208        self.put_record(&record).await
209    }
210}
211
212impl KvRecord for SendIntentRecord {
213    const NAMESPACE: &'static str = SEND_INTENT_NAMESPACE;
214
215    fn key(&self) -> String {
216        self.intent_id.to_string()
217    }
218}
219
220impl KvRecord for FailedSendAttemptRecord {
221    const NAMESPACE: &'static str = FAILED_SEND_ATTEMPT_NAMESPACE;
222
223    fn key(&self) -> String {
224        self.attempt_id.to_string()
225    }
226}
227
228impl ReplaceState<crate::send::payment_intent::record::SendIntentState> for SendIntentRecord {
229    fn replace_state(&mut self, state: crate::send::payment_intent::record::SendIntentState) {
230        self.state = state;
231    }
232}
233
234impl KvRecord for SendBatchRecord {
235    const NAMESPACE: &'static str = SEND_BATCH_NAMESPACE;
236
237    fn key(&self) -> String {
238        self.batch_id.to_string()
239    }
240}
241
242impl ReplaceState<crate::send::batch_transaction::record::SendBatchState> for SendBatchRecord {
243    fn replace_state(&mut self, state: crate::send::batch_transaction::record::SendBatchState) {
244        self.state = state;
245    }
246}
247
248impl KvRecord for ReceiveIntentRecord {
249    const NAMESPACE: &'static str = RECEIVE_INTENT_NAMESPACE;
250
251    fn key(&self) -> String {
252        self.intent_id.to_string()
253    }
254}
255
256impl KvRecord for FinalizedSendIntentRecord {
257    const NAMESPACE: &'static str = FINALIZED_INTENT_NAMESPACE;
258
259    fn key(&self) -> String {
260        self.intent_id.to_string()
261    }
262}
263
264impl KvRecord for FinalizedReceiveIntentRecord {
265    const NAMESPACE: &'static str = FINALIZED_RECEIVE_INTENT_NAMESPACE;
266
267    fn key(&self) -> String {
268        self.intent_id.to_string()
269    }
270}
271
272#[cfg(test)]
273mod tests {
274    use std::sync::Arc;
275
276    use uuid::Uuid;
277
278    use super::*;
279    use crate::send::batch_transaction::record::{
280        BatchOutputAssignment, SendBatchRecord, SendBatchState,
281    };
282    use crate::send::payment_intent::record::{SendIntentRecord, SendIntentState};
283    use crate::types::{PaymentMetadata, PaymentTier};
284
285    /// Helper: create an in-memory KVStore-backed BdkStorage for tests
286    async fn test_storage() -> BdkStorage {
287        let db = cdk_sqlite::mint::memory::empty()
288            .await
289            .expect("in-memory db");
290        BdkStorage::new(Arc::new(db))
291    }
292
293    /// Helper: build a test SendIntentRecord in Pending state
294    fn make_pending_intent(intent_id: Uuid) -> SendIntentRecord {
295        SendIntentRecord {
296            intent_id,
297            quote_id: "test-quote-1".to_string(),
298            address: "bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
299            amount_sat: 50_000,
300            max_fee_amount_sat: 1_000,
301            tier: PaymentTier::Immediate,
302            metadata: PaymentMetadata::default(),
303            state: SendIntentState::Pending {
304                created_at: 1_700_000_000,
305            },
306        }
307    }
308
309    // ── Serialization round-trip tests ─────────────────────────────
310
311    #[test]
312    fn test_send_intent_record_state_roundtrip() {
313        let batch_id = Uuid::new_v4();
314
315        let states = vec![
316            SendIntentState::Pending {
317                created_at: 1_700_000_000,
318            },
319            SendIntentState::Batched {
320                batch_id,
321                created_at: 1_700_000_000,
322            },
323            SendIntentState::AwaitingConfirmation {
324                batch_id,
325                txid: "abc123def456".to_string(),
326                outpoint: "abc123def456:0".to_string(),
327                fee_contribution_sat: 250,
328                created_at: 1_700_000_000,
329            },
330            SendIntentState::Failed {
331                reason: "pre-sign failure".to_string(),
332                created_at: 1_700_000_000,
333                failed_at: 1_700_000_100,
334            },
335        ];
336
337        for state in states {
338            let json = serde_json::to_string(&state).expect("serialize state");
339            let deserialized: SendIntentState =
340                serde_json::from_str(&json).expect("deserialize state");
341
342            // Re-serialize and compare JSON to verify round-trip
343            let json2 = serde_json::to_string(&deserialized).expect("re-serialize state");
344            assert_eq!(json, json2, "Round-trip failed for state variant");
345        }
346
347        // Also test full SendIntentRecord round-trip
348        let intent = SendIntentRecord {
349            intent_id: Uuid::new_v4(),
350            quote_id: "quote-123".to_string(),
351            address: "bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
352            amount_sat: 100_000,
353            max_fee_amount_sat: 5_000,
354            tier: PaymentTier::Standard,
355            metadata: PaymentMetadata::from_optional_json(Some(r#"{"key": "value"}"#)),
356            state: SendIntentState::Pending {
357                created_at: 1_700_000_000,
358            },
359        };
360        let json = serde_json::to_string(&intent).expect("serialize intent");
361        let deserialized: SendIntentRecord =
362            serde_json::from_str(&json).expect("deserialize intent");
363        assert_eq!(intent.intent_id, deserialized.intent_id);
364        assert_eq!(intent.quote_id, deserialized.quote_id);
365        assert_eq!(intent.address, deserialized.address);
366        assert_eq!(intent.amount_sat, deserialized.amount_sat);
367        assert_eq!(intent.max_fee_amount_sat, deserialized.max_fee_amount_sat);
368    }
369
370    #[test]
371    fn test_send_batch_record_state_roundtrip() {
372        let intent_ids = vec![Uuid::new_v4(), Uuid::new_v4()];
373        let assignments: Vec<BatchOutputAssignment> = intent_ids
374            .iter()
375            .enumerate()
376            .map(|(idx, id)| BatchOutputAssignment {
377                intent_id: *id,
378                vout: idx as u32,
379                fee_contribution_sat: 125,
380            })
381            .collect();
382
383        let states = vec![
384            SendBatchState::Built {
385                psbt_bytes: vec![0x01, 0x02, 0x03, 0x04],
386                intent_ids: intent_ids.clone(),
387            },
388            SendBatchState::Signed {
389                tx_bytes: vec![0x05, 0x06, 0x07, 0x08],
390                assignments: assignments.clone(),
391                fee_sat: 250,
392            },
393            SendBatchState::Broadcast {
394                txid: "deadbeef1234".to_string(),
395                tx_bytes: vec![0x05, 0x06, 0x07, 0x08],
396                assignments: assignments.clone(),
397                fee_sat: 1000,
398            },
399        ];
400
401        for state in states {
402            let json = serde_json::to_string(&state).expect("serialize state");
403            let deserialized: SendBatchState =
404                serde_json::from_str(&json).expect("deserialize state");
405
406            let json2 = serde_json::to_string(&deserialized).expect("re-serialize state");
407            assert_eq!(json, json2, "Round-trip failed for batch state variant");
408        }
409
410        // Also test full SendBatchRecord round-trip
411        let batch = SendBatchRecord {
412            batch_id: Uuid::new_v4(),
413            state: SendBatchState::Broadcast {
414                txid: "abc".to_string(),
415                tx_bytes: vec![1, 2, 3],
416                assignments,
417                fee_sat: 500,
418            },
419        };
420        let json = serde_json::to_string(&batch).expect("serialize batch");
421        let deserialized: SendBatchRecord = serde_json::from_str(&json).expect("deserialize batch");
422        assert_eq!(batch.batch_id, deserialized.batch_id);
423    }
424
425    // ── CRUD tests for send intent storage ────────────────
426
427    #[tokio::test]
428    async fn test_send_intent_crud() {
429        let storage = test_storage().await;
430        let intent_id = Uuid::new_v4();
431        let intent = make_pending_intent(intent_id);
432
433        // Store
434        storage
435            .create_send_intent_if_absent(&intent)
436            .await
437            .expect("store");
438
439        // Get
440        let fetched = storage
441            .get_send_intent(&intent_id)
442            .await
443            .expect("get")
444            .expect("should exist");
445        assert_eq!(fetched.intent_id, intent_id);
446        assert_eq!(fetched.quote_id, "test-quote-1");
447        assert_eq!(fetched.amount_sat, 50_000);
448        assert!(matches!(
449            fetched.state,
450            SendIntentState::Pending {
451                created_at: 1_700_000_000
452            }
453        ));
454
455        // Update state to Batched
456        let batch_id = Uuid::new_v4();
457        storage
458            .update_send_intent(
459                &intent_id,
460                &SendIntentState::Batched {
461                    batch_id,
462                    created_at: 1_700_000_000,
463                },
464            )
465            .await
466            .expect("update");
467
468        let updated = storage
469            .get_send_intent(&intent_id)
470            .await
471            .expect("get")
472            .expect("should exist");
473        match &updated.state {
474            SendIntentState::Batched {
475                batch_id: bid,
476                created_at,
477            } => {
478                assert_eq!(*bid, batch_id);
479                assert_eq!(*created_at, 1_700_000_000);
480            }
481            other => panic!("Expected Batched, got {:?}", other),
482        }
483
484        // get_all_send_intents
485        let all = storage.get_all_send_intents().await.expect("get_all");
486        assert_eq!(all.len(), 1);
487
488        // get_pending_send_intents should now be empty (intent is Batched)
489        let pending = storage
490            .get_pending_send_intents()
491            .await
492            .expect("get_pending");
493        assert!(
494            pending.is_empty(),
495            "Batched intent should not appear in pending"
496        );
497
498        // Revert to Pending and check pending filter
499        storage
500            .update_send_intent(
501                &intent_id,
502                &SendIntentState::Pending {
503                    created_at: 1_700_000_000,
504                },
505            )
506            .await
507            .expect("revert");
508        let pending = storage
509            .get_pending_send_intents()
510            .await
511            .expect("get_pending");
512        assert_eq!(pending.len(), 1);
513
514        // Delete
515        storage
516            .delete_send_intent(&intent_id)
517            .await
518            .expect("delete");
519        let gone = storage.get_send_intent(&intent_id).await.expect("get");
520        assert!(gone.is_none(), "Intent should be deleted");
521
522        // get_all should now be empty
523        let all = storage.get_all_send_intents().await.expect("get_all");
524        assert!(all.is_empty());
525    }
526
527    #[tokio::test]
528    async fn test_create_send_intent_if_absent_rejects_duplicate_quote_id() {
529        let storage = test_storage().await;
530        let first = make_pending_intent(Uuid::new_v4());
531        storage
532            .create_send_intent_if_absent(&first)
533            .await
534            .expect("store first");
535
536        let mut second = make_pending_intent(Uuid::new_v4());
537        second.address = "bcrt1qother".to_string();
538
539        let err = storage
540            .create_send_intent_if_absent(&second)
541            .await
542            .expect_err("duplicate quote id should fail");
543        assert!(matches!(err, Error::DuplicateQuoteId(_)));
544    }
545
546    /// Regression test for un-rolled-back transaction on the active-duplicate
547    /// path. Prior to the fix, `create_send_intent_if_absent` returned
548    /// `DuplicateQuoteId` without rolling back the open transaction, violating
549    /// the `DbTransactionFinalizer` contract. This test hits the duplicate
550    /// branch many times and then performs follow-up storage operations to
551    /// prove the backend isn't starved of connections/locks and subsequent
552    /// writes still succeed.
553    #[tokio::test]
554    async fn test_create_send_intent_if_absent_active_duplicate_rolls_back_tx() {
555        let storage = test_storage().await;
556        let first = make_pending_intent(Uuid::new_v4());
557        storage
558            .create_send_intent_if_absent(&first)
559            .await
560            .expect("store first");
561
562        // Repeatedly trigger the active-duplicate branch. If the transaction
563        // were leaked (never committed or rolled back), a pool-backed KV store
564        // could eventually deadlock or return an error here.
565        for _ in 0..16 {
566            let mut dup = make_pending_intent(Uuid::new_v4());
567            dup.address = "bcrt1qother".to_string();
568            let err = storage
569                .create_send_intent_if_absent(&dup)
570                .await
571                .expect_err("duplicate quote id should fail");
572            assert!(matches!(err, Error::DuplicateQuoteId(_)));
573        }
574
575        // A follow-up write with a fresh quote id must still succeed,
576        // proving the store is not wedged by a leaked transaction.
577        let mut follow_up = make_pending_intent(Uuid::new_v4());
578        follow_up.quote_id = "test-quote-follow-up".to_string();
579        storage
580            .create_send_intent_if_absent(&follow_up)
581            .await
582            .expect("follow-up write must succeed after duplicate rejection");
583
584        // Sanity: the original intent is untouched.
585        let original = storage
586            .get_send_intent(&first.intent_id)
587            .await
588            .expect("get first")
589            .expect("first intent should still exist");
590        assert_eq!(original.intent_id, first.intent_id);
591        assert_eq!(original.quote_id, first.quote_id);
592    }
593
594    /// Regression test for un-rolled-back transaction on the finalized-duplicate
595    /// path. Mirrors the active-duplicate test but exercises the second
596    /// early-return branch (finalized tombstone present).
597    #[tokio::test]
598    async fn test_create_send_intent_if_absent_finalized_duplicate_rolls_back_tx() {
599        let storage = test_storage().await;
600        let intent = make_pending_intent(Uuid::new_v4());
601        let intent_id = intent.intent_id;
602        let quote_id = intent.quote_id.clone();
603        storage
604            .create_send_intent_if_absent(&intent)
605            .await
606            .expect("store intent");
607
608        let tombstone = FinalizedSendIntentRecord {
609            intent_id,
610            quote_id: quote_id.clone(),
611            total_spent_sat: 50_500,
612            outpoint: "txid:0".to_string(),
613            finalized_at: 1_700_000_001,
614        };
615        storage
616            .finalize_send_intent(&intent_id, &tombstone)
617            .await
618            .expect("finalize intent");
619
620        // Repeatedly trigger the finalized-duplicate branch.
621        for _ in 0..16 {
622            let mut dup = make_pending_intent(Uuid::new_v4());
623            dup.quote_id = quote_id.clone();
624            let err = storage
625                .create_send_intent_if_absent(&dup)
626                .await
627                .expect_err("finalized quote id should be rejected");
628            assert!(matches!(err, Error::DuplicateQuoteId(_)));
629        }
630
631        // A follow-up write with a fresh quote id must still succeed.
632        let mut follow_up = make_pending_intent(Uuid::new_v4());
633        follow_up.quote_id = "test-quote-follow-up-finalized".to_string();
634        storage
635            .create_send_intent_if_absent(&follow_up)
636            .await
637            .expect("follow-up write must succeed after duplicate rejection");
638    }
639
640    #[tokio::test]
641    async fn test_finalize_send_intent_removes_quote_id_index() {
642        let storage = test_storage().await;
643        let intent = make_pending_intent(Uuid::new_v4());
644        let intent_id = intent.intent_id;
645        let quote_id = intent.quote_id.clone();
646        storage
647            .create_send_intent_if_absent(&intent)
648            .await
649            .expect("store intent");
650
651        let tombstone = FinalizedSendIntentRecord {
652            intent_id,
653            quote_id: quote_id.clone(),
654            total_spent_sat: 50_500,
655            outpoint: "txid:0".to_string(),
656            finalized_at: 1_700_000_001,
657        };
658        storage
659            .finalize_send_intent(&intent_id, &tombstone)
660            .await
661            .expect("finalize intent");
662
663        assert!(storage
664            .get_send_intent(&intent_id)
665            .await
666            .expect("get intent")
667            .is_none());
668        assert!(storage
669            .get_send_intent_by_quote_id(&quote_id)
670            .await
671            .expect("lookup quote id")
672            .is_none());
673        assert!(storage
674            .get_finalized_intent(&intent_id)
675            .await
676            .expect("get tombstone")
677            .is_some());
678
679        // A new intent for the SAME quote ID should NOT be allowed
680        let mut second = make_pending_intent(Uuid::new_v4());
681        second.quote_id = quote_id.clone();
682        let err = storage
683            .create_send_intent_if_absent(&second)
684            .await
685            .expect_err("should reject already finalized quote id");
686        assert!(matches!(err, Error::DuplicateQuoteId(_)));
687    }
688
689    // ── CRUD tests for send batch storage ─────────────────
690
691    #[tokio::test]
692    async fn test_send_batch_crud() {
693        let storage = test_storage().await;
694        let batch_id = Uuid::new_v4();
695        let intent_ids = vec![Uuid::new_v4(), Uuid::new_v4()];
696
697        let batch = SendBatchRecord {
698            batch_id,
699            state: SendBatchState::Built {
700                psbt_bytes: vec![0xAA, 0xBB],
701                intent_ids: intent_ids.clone(),
702            },
703        };
704
705        // Store
706        storage.store_send_batch(&batch).await.expect("store");
707
708        // Get
709        let fetched = storage
710            .get_send_batch(&batch_id)
711            .await
712            .expect("get")
713            .expect("should exist");
714        assert_eq!(fetched.batch_id, batch_id);
715        match &fetched.state {
716            SendBatchState::Built {
717                psbt_bytes,
718                intent_ids: ids,
719            } => {
720                assert_eq!(psbt_bytes, &vec![0xAA, 0xBB]);
721                assert_eq!(ids, &intent_ids);
722            }
723            other => panic!("Expected Built, got {:?}", other),
724        }
725
726        // Update to Signed
727        let tx_bytes = vec![0xCC, 0xDD, 0xEE];
728        let assignments: Vec<BatchOutputAssignment> = intent_ids
729            .iter()
730            .enumerate()
731            .map(|(idx, intent_id)| BatchOutputAssignment {
732                intent_id: *intent_id,
733                vout: idx as u32,
734                fee_contribution_sat: 125,
735            })
736            .collect();
737        storage
738            .update_send_batch(
739                &batch_id,
740                &SendBatchState::Signed {
741                    tx_bytes: tx_bytes.clone(),
742                    assignments: assignments.clone(),
743                    fee_sat: 250,
744                },
745            )
746            .await
747            .expect("update");
748
749        let updated = storage
750            .get_send_batch(&batch_id)
751            .await
752            .expect("get")
753            .expect("should exist");
754        match &updated.state {
755            SendBatchState::Signed {
756                tx_bytes: tb,
757                assignments: a,
758                fee_sat,
759            } => {
760                assert_eq!(tb, &tx_bytes);
761                assert_eq!(a, &assignments);
762                assert_eq!(*fee_sat, 250);
763            }
764            other => panic!("Expected Signed, got {:?}", other),
765        }
766
767        // Update to Broadcast
768        storage
769            .update_send_batch(
770                &batch_id,
771                &SendBatchState::Broadcast {
772                    txid: "txid123".to_string(),
773                    tx_bytes: tx_bytes.clone(),
774                    assignments: assignments.clone(),
775                    fee_sat: 400,
776                },
777            )
778            .await
779            .expect("update to broadcast");
780
781        let broadcast = storage
782            .get_send_batch(&batch_id)
783            .await
784            .expect("get")
785            .expect("should exist");
786        match &broadcast.state {
787            SendBatchState::Broadcast {
788                txid,
789                tx_bytes: tb,
790                assignments: a,
791                fee_sat,
792            } => {
793                assert_eq!(txid, "txid123");
794                assert_eq!(tb, &tx_bytes);
795                assert_eq!(a, &assignments);
796                assert_eq!(*fee_sat, 400);
797            }
798            other => panic!("Expected Broadcast, got {:?}", other),
799        }
800
801        // get_all
802        let all = storage.get_all_send_batches().await.expect("get_all");
803        assert_eq!(all.len(), 1);
804
805        // Delete
806        storage.delete_send_batch(&batch_id).await.expect("delete");
807        let gone = storage.get_send_batch(&batch_id).await.expect("get");
808        assert!(gone.is_none(), "Batch should be deleted");
809
810        let all = storage.get_all_send_batches().await.expect("get_all");
811        assert!(all.is_empty());
812    }
813
814    // ── Update non-existent records ───────────────────────
815
816    #[tokio::test]
817    async fn test_update_nonexistent_intent_returns_error() {
818        let storage = test_storage().await;
819        let result = storage
820            .update_send_intent(
821                &Uuid::new_v4(),
822                &SendIntentState::Pending {
823                    created_at: 1_700_000_000,
824                },
825            )
826            .await;
827        assert!(result.is_err(), "Updating nonexistent intent should fail");
828    }
829
830    #[tokio::test]
831    async fn test_update_nonexistent_batch_returns_error() {
832        let storage = test_storage().await;
833        let result = storage
834            .update_send_batch(
835                &Uuid::new_v4(),
836                &SendBatchState::Built {
837                    psbt_bytes: vec![],
838                    intent_ids: vec![],
839                },
840            )
841            .await;
842        assert!(result.is_err(), "Updating nonexistent batch should fail");
843    }
844
845    // ── Confirmation storage-level tests ──────────────────
846
847    #[tokio::test]
848    async fn test_awaiting_confirmation_intent_lookup() {
849        let storage = test_storage().await;
850        let batch_id = Uuid::new_v4();
851
852        // Store one Pending and one AwaitingConfirmation intent
853        let pending_id = Uuid::new_v4();
854        let pending = make_pending_intent(pending_id);
855        storage
856            .create_send_intent_if_absent(&pending)
857            .await
858            .expect("store pending");
859
860        let confirming_id = Uuid::new_v4();
861        let confirming = SendIntentRecord {
862            intent_id: confirming_id,
863            quote_id: "quote-confirm".to_string(),
864            address: "bcrt1qw508d6qejxtdg4y5r3zarvary0c5xw7kygt080".to_string(),
865            amount_sat: 75_000,
866            max_fee_amount_sat: 2_000,
867            tier: PaymentTier::Standard,
868            metadata: PaymentMetadata::default(),
869            state: SendIntentState::AwaitingConfirmation {
870                batch_id,
871                txid: "abc123".to_string(),
872                outpoint: "abc123:0".to_string(),
873                fee_contribution_sat: 300,
874                created_at: 1_700_000_000,
875            },
876        };
877        storage
878            .create_send_intent_if_absent(&confirming)
879            .await
880            .expect("store confirming");
881
882        // get_all returns both
883        let all = storage.get_all_send_intents().await.expect("get_all");
884        assert_eq!(all.len(), 2);
885
886        // get_pending only returns the Pending one
887        let pending = storage
888            .get_pending_send_intents()
889            .await
890            .expect("get_pending");
891        assert_eq!(pending.len(), 1);
892        assert_eq!(pending[0].intent_id, pending_id);
893
894        // Can look up AwaitingConfirmation by ID and read its fields
895        let fetched = storage
896            .get_send_intent(&confirming_id)
897            .await
898            .expect("get")
899            .expect("should exist");
900        match &fetched.state {
901            SendIntentState::AwaitingConfirmation {
902                txid,
903                outpoint,
904                fee_contribution_sat,
905                ..
906            } => {
907                assert_eq!(txid, "abc123");
908                assert_eq!(outpoint, "abc123:0");
909                assert_eq!(*fee_contribution_sat, 300);
910            }
911            other => panic!("Expected AwaitingConfirmation, got {:?}", other),
912        }
913    }
914
915    #[tokio::test]
916    async fn test_finalize_confirmed_intent_and_cleanup_batch() {
917        let storage = test_storage().await;
918        let batch_id = Uuid::new_v4();
919        let intent_id_1 = Uuid::new_v4();
920        let intent_id_2 = Uuid::new_v4();
921
922        // Store a Broadcast batch referencing two intents
923        let batch = SendBatchRecord {
924            batch_id,
925            state: SendBatchState::Broadcast {
926                txid: "tx123".to_string(),
927                tx_bytes: vec![0x01],
928                assignments: vec![
929                    BatchOutputAssignment {
930                        intent_id: intent_id_1,
931                        vout: 0,
932                        fee_contribution_sat: 250,
933                    },
934                    BatchOutputAssignment {
935                        intent_id: intent_id_2,
936                        vout: 1,
937                        fee_contribution_sat: 250,
938                    },
939                ],
940                fee_sat: 500,
941            },
942        };
943        storage.store_send_batch(&batch).await.expect("store batch");
944
945        // Store both intents as AwaitingConfirmation
946        for (id, quote) in [(intent_id_1, "q1"), (intent_id_2, "q2")] {
947            let intent = SendIntentRecord {
948                intent_id: id,
949                quote_id: quote.to_string(),
950                address: "bcrt1qaddr".to_string(),
951                amount_sat: 10_000,
952                max_fee_amount_sat: 500,
953                tier: PaymentTier::Immediate,
954                metadata: PaymentMetadata::default(),
955                state: SendIntentState::AwaitingConfirmation {
956                    batch_id,
957                    txid: "tx123".to_string(),
958                    outpoint: format!("tx123:{}", if id == intent_id_1 { 0 } else { 1 }),
959                    fee_contribution_sat: 250,
960                    created_at: 1_700_000_000,
961                },
962            };
963            storage
964                .create_send_intent_if_absent(&intent)
965                .await
966                .expect("store intent");
967        }
968
969        // "Finalize" first intent (simulating confirmation handler)
970        storage
971            .delete_send_intent(&intent_id_1)
972            .await
973            .expect("delete first");
974
975        // Batch should still exist -- second intent is still active
976        let remaining_intents = storage.get_all_send_intents().await.expect("all intents");
977        assert_eq!(remaining_intents.len(), 1);
978        assert_eq!(remaining_intents[0].intent_id, intent_id_2);
979
980        // "Finalize" second intent
981        storage
982            .delete_send_intent(&intent_id_2)
983            .await
984            .expect("delete second");
985
986        // Now simulate cleanup_completed_batches logic:
987        // Check if any intents still reference this batch
988        let all_intents = storage.get_all_send_intents().await.expect("all intents");
989        let batches = storage.get_all_send_batches().await.expect("all batches");
990        assert_eq!(batches.len(), 1);
991
992        let batch_intent_ids: Vec<Uuid> = match &batches[0].state {
993            SendBatchState::Broadcast { assignments, .. } => {
994                assignments.iter().map(|a| a.intent_id).collect()
995            }
996            _ => panic!("Expected Broadcast"),
997        };
998        let has_remaining = batch_intent_ids
999            .iter()
1000            .any(|bid| all_intents.iter().any(|i| i.intent_id == *bid));
1001        assert!(!has_remaining, "All intents finalized");
1002
1003        // Clean up batch
1004        storage
1005            .delete_send_batch(&batch_id)
1006            .await
1007            .expect("delete batch");
1008        let batches = storage.get_all_send_batches().await.expect("all batches");
1009        assert!(batches.is_empty());
1010    }
1011
1012    // ── Recovery storage-level tests ─────────────────────
1013
1014    #[tokio::test]
1015    async fn test_pre_broadcast_recovery_reverts_intents() {
1016        let storage = test_storage().await;
1017        let signed_intent_id = Uuid::new_v4();
1018        for state in [
1019            SendBatchState::Built {
1020                psbt_bytes: vec![0x01, 0x02],
1021                intent_ids: vec![Uuid::new_v4(), Uuid::new_v4()],
1022            },
1023            SendBatchState::Signed {
1024                tx_bytes: vec![0xAA, 0xBB],
1025                assignments: vec![BatchOutputAssignment {
1026                    intent_id: signed_intent_id,
1027                    vout: 0,
1028                    fee_contribution_sat: 100,
1029                }],
1030                fee_sat: 100,
1031            },
1032        ] {
1033            let batch_id = Uuid::new_v4();
1034            let intent_ids: Vec<Uuid> = match &state {
1035                SendBatchState::Built { intent_ids, .. } => intent_ids.clone(),
1036                SendBatchState::Signed { assignments, .. } => {
1037                    assignments.iter().map(|a| a.intent_id).collect()
1038                }
1039                SendBatchState::Broadcast { .. } => unreachable!(),
1040            };
1041
1042            let batch = SendBatchRecord { batch_id, state };
1043            storage.store_send_batch(&batch).await.expect("store batch");
1044
1045            for intent_id in &intent_ids {
1046                let intent = SendIntentRecord {
1047                    intent_id: *intent_id,
1048                    quote_id: format!("q-{}", intent_id),
1049                    address: "bcrt1qaddr".to_string(),
1050                    amount_sat: 25_000,
1051                    max_fee_amount_sat: 500,
1052                    tier: PaymentTier::Immediate,
1053                    metadata: PaymentMetadata::default(),
1054                    state: SendIntentState::Batched {
1055                        batch_id,
1056                        created_at: 1_700_000_000,
1057                    },
1058                };
1059                storage
1060                    .create_send_intent_if_absent(&intent)
1061                    .await
1062                    .expect("store intent");
1063            }
1064
1065            for intent_id in &intent_ids {
1066                storage
1067                    .update_send_intent(
1068                        intent_id,
1069                        &SendIntentState::Pending {
1070                            created_at: 1_700_000_000,
1071                        },
1072                    )
1073                    .await
1074                    .expect("revert intent");
1075            }
1076
1077            storage
1078                .delete_send_batch(&batch_id)
1079                .await
1080                .expect("delete batch");
1081
1082            let batches = storage.get_all_send_batches().await.expect("all batches");
1083            assert!(batches.iter().all(|b| b.batch_id != batch_id));
1084
1085            for intent_id in intent_ids {
1086                let intent = storage
1087                    .get_send_intent(&intent_id)
1088                    .await
1089                    .expect("get intent")
1090                    .expect("intent exists");
1091                assert!(matches!(intent.state, SendIntentState::Pending { .. }));
1092                storage
1093                    .delete_send_intent(&intent_id)
1094                    .await
1095                    .expect("cleanup intent");
1096            }
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn test_post_broadcast_and_orphaned_recovery_storage_shapes() {
1102        let storage = test_storage().await;
1103        let broadcast_batch_id = Uuid::new_v4();
1104        let broadcast_intent_id = Uuid::new_v4();
1105        let orphan_batch_id = Uuid::new_v4();
1106        let orphan_intent_id = Uuid::new_v4();
1107
1108        let batch = SendBatchRecord {
1109            batch_id: broadcast_batch_id,
1110            state: SendBatchState::Broadcast {
1111                txid: "txid_broadcast".to_string(),
1112                tx_bytes: vec![0x01, 0x02, 0x03],
1113                assignments: vec![BatchOutputAssignment {
1114                    intent_id: broadcast_intent_id,
1115                    vout: 0,
1116                    fee_contribution_sat: 200,
1117                }],
1118                fee_sat: 200,
1119            },
1120        };
1121        storage.store_send_batch(&batch).await.expect("store batch");
1122
1123        let awaiting_intent = SendIntentRecord {
1124            intent_id: broadcast_intent_id,
1125            quote_id: "q-broadcast".to_string(),
1126            address: "bcrt1qaddr".to_string(),
1127            amount_sat: 40_000,
1128            max_fee_amount_sat: 800,
1129            tier: PaymentTier::Economy,
1130            metadata: PaymentMetadata::default(),
1131            state: SendIntentState::AwaitingConfirmation {
1132                batch_id: broadcast_batch_id,
1133                txid: "txid_broadcast".to_string(),
1134                outpoint: "txid_broadcast:0".to_string(),
1135                fee_contribution_sat: 200,
1136                created_at: 1_700_000_000,
1137            },
1138        };
1139        storage
1140            .create_send_intent_if_absent(&awaiting_intent)
1141            .await
1142            .expect("store awaiting intent");
1143
1144        let orphan_intent = SendIntentRecord {
1145            intent_id: orphan_intent_id,
1146            quote_id: "q-orphan".to_string(),
1147            address: "bcrt1qaddr".to_string(),
1148            amount_sat: 20_000,
1149            max_fee_amount_sat: 400,
1150            tier: PaymentTier::Immediate,
1151            metadata: PaymentMetadata::default(),
1152            state: SendIntentState::Batched {
1153                batch_id: orphan_batch_id,
1154                created_at: 1_700_000_000,
1155            },
1156        };
1157        storage
1158            .create_send_intent_if_absent(&orphan_intent)
1159            .await
1160            .expect("store orphan intent");
1161
1162        let batches = storage.get_all_send_batches().await.expect("all batches");
1163        assert_eq!(batches.len(), 1);
1164        assert!(matches!(batches[0].state, SendBatchState::Broadcast { .. }));
1165
1166        let awaiting = storage
1167            .get_send_intent(&broadcast_intent_id)
1168            .await
1169            .expect("get awaiting")
1170            .expect("awaiting exists");
1171        assert!(matches!(
1172            awaiting.state,
1173            SendIntentState::AwaitingConfirmation { .. }
1174        ));
1175
1176        storage
1177            .update_send_intent(
1178                &orphan_intent_id,
1179                &SendIntentState::Pending {
1180                    created_at: 1_700_000_000,
1181                },
1182            )
1183            .await
1184            .expect("revert orphan");
1185
1186        let orphan = storage
1187            .get_send_intent(&orphan_intent_id)
1188            .await
1189            .expect("get orphan")
1190            .expect("orphan exists");
1191        assert!(matches!(orphan.state, SendIntentState::Pending { .. }));
1192    }
1193
1194    #[tokio::test]
1195    async fn test_recovery_shape_batch_can_reference_missing_intent() {
1196        let storage = test_storage().await;
1197        let batch_id = Uuid::new_v4();
1198        let present_intent_id = Uuid::new_v4();
1199        let missing_intent_id = Uuid::new_v4();
1200
1201        let batch = SendBatchRecord {
1202            batch_id,
1203            state: SendBatchState::Built {
1204                psbt_bytes: vec![0x01, 0x02],
1205                intent_ids: vec![present_intent_id, missing_intent_id],
1206            },
1207        };
1208        storage.store_send_batch(&batch).await.expect("store batch");
1209
1210        let intent = SendIntentRecord {
1211            intent_id: present_intent_id,
1212            quote_id: "q-present".to_string(),
1213            address: "bcrt1qaddr".to_string(),
1214            amount_sat: 25_000,
1215            max_fee_amount_sat: 500,
1216            tier: PaymentTier::Immediate,
1217            metadata: PaymentMetadata::default(),
1218            state: SendIntentState::Batched {
1219                batch_id,
1220                created_at: 1_700_000_000,
1221            },
1222        };
1223        storage
1224            .create_send_intent_if_absent(&intent)
1225            .await
1226            .expect("store present intent");
1227
1228        let stored_batch = storage
1229            .get_send_batch(&batch_id)
1230            .await
1231            .expect("get batch")
1232            .expect("batch exists");
1233        match stored_batch.state {
1234            SendBatchState::Built { intent_ids, .. } => {
1235                assert_eq!(intent_ids.len(), 2);
1236                assert!(intent_ids.contains(&missing_intent_id));
1237            }
1238            _ => panic!("expected built batch"),
1239        }
1240    }
1241
1242    #[tokio::test]
1243    async fn test_recovery_shape_intent_can_reference_missing_batch() {
1244        let storage = test_storage().await;
1245        let batch_id = Uuid::new_v4();
1246        let intent_id = Uuid::new_v4();
1247
1248        let intent = SendIntentRecord {
1249            intent_id,
1250            quote_id: "q-missing-batch".to_string(),
1251            address: "bcrt1qaddr".to_string(),
1252            amount_sat: 15_000,
1253            max_fee_amount_sat: 300,
1254            tier: PaymentTier::Immediate,
1255            metadata: PaymentMetadata::default(),
1256            state: SendIntentState::Batched {
1257                batch_id,
1258                created_at: 1_700_000_000,
1259            },
1260        };
1261        storage
1262            .create_send_intent_if_absent(&intent)
1263            .await
1264            .expect("store intent");
1265
1266        let stored = storage
1267            .get_send_intent(&intent_id)
1268            .await
1269            .expect("get intent")
1270            .expect("intent exists");
1271        match stored.state {
1272            SendIntentState::Batched {
1273                batch_id: stored_batch_id,
1274                ..
1275            } => {
1276                assert_eq!(stored_batch_id, batch_id);
1277            }
1278            _ => panic!("expected batched intent"),
1279        }
1280
1281        assert!(
1282            storage
1283                .get_send_batch(&batch_id)
1284                .await
1285                .expect("get batch")
1286                .is_none(),
1287            "batch should be missing for orphan intent scenario"
1288        );
1289    }
1290
1291    #[tokio::test]
1292    async fn test_recovery_shape_batch_and_intent_can_disagree_on_membership() {
1293        let storage = test_storage().await;
1294        let referenced_batch_id = Uuid::new_v4();
1295        let actual_batch_id = Uuid::new_v4();
1296        let intent_id = Uuid::new_v4();
1297
1298        let batch = SendBatchRecord {
1299            batch_id: actual_batch_id,
1300            state: SendBatchState::Broadcast {
1301                txid: "txid_membership".to_string(),
1302                tx_bytes: vec![0x01, 0x02, 0x03],
1303                assignments: Vec::new(),
1304                fee_sat: 200,
1305            },
1306        };
1307        storage.store_send_batch(&batch).await.expect("store batch");
1308
1309        let intent = SendIntentRecord {
1310            intent_id,
1311            quote_id: "q-membership".to_string(),
1312            address: "bcrt1qaddr".to_string(),
1313            amount_sat: 30_000,
1314            max_fee_amount_sat: 700,
1315            tier: PaymentTier::Standard,
1316            metadata: PaymentMetadata::default(),
1317            state: SendIntentState::AwaitingConfirmation {
1318                batch_id: referenced_batch_id,
1319                txid: "txid_membership".to_string(),
1320                outpoint: "txid_membership:0".to_string(),
1321                fee_contribution_sat: 200,
1322                created_at: 1_700_000_000,
1323            },
1324        };
1325        storage
1326            .create_send_intent_if_absent(&intent)
1327            .await
1328            .expect("store intent");
1329
1330        let stored_batch = storage
1331            .get_send_batch(&actual_batch_id)
1332            .await
1333            .expect("get batch")
1334            .expect("batch exists");
1335        match stored_batch.state {
1336            SendBatchState::Broadcast { assignments, .. } => {
1337                assert!(
1338                    assignments.is_empty(),
1339                    "batch intentionally excludes the intent"
1340                );
1341            }
1342            _ => panic!("expected broadcast batch"),
1343        }
1344
1345        let stored_intent = storage
1346            .get_send_intent(&intent_id)
1347            .await
1348            .expect("get intent")
1349            .expect("intent exists");
1350        match stored_intent.state {
1351            SendIntentState::AwaitingConfirmation { batch_id, .. } => {
1352                assert_eq!(batch_id, referenced_batch_id);
1353            }
1354            _ => panic!("expected awaiting confirmation intent"),
1355        }
1356    }
1357
1358    #[tokio::test]
1359    async fn test_recovery_shape_batch_can_have_mixed_intent_states() {
1360        let storage = test_storage().await;
1361        let batch_id = Uuid::new_v4();
1362        let batched_intent_id = Uuid::new_v4();
1363        let awaiting_intent_id = Uuid::new_v4();
1364
1365        let batch = SendBatchRecord {
1366            batch_id,
1367            state: SendBatchState::Broadcast {
1368                txid: "txid_mixed".to_string(),
1369                tx_bytes: vec![0x01, 0x02, 0x03],
1370                assignments: vec![
1371                    BatchOutputAssignment {
1372                        intent_id: batched_intent_id,
1373                        vout: 0,
1374                        fee_contribution_sat: 200,
1375                    },
1376                    BatchOutputAssignment {
1377                        intent_id: awaiting_intent_id,
1378                        vout: 1,
1379                        fee_contribution_sat: 200,
1380                    },
1381                ],
1382                fee_sat: 400,
1383            },
1384        };
1385        storage.store_send_batch(&batch).await.expect("store batch");
1386
1387        for (intent_id, state) in [
1388            (
1389                batched_intent_id,
1390                SendIntentState::Batched {
1391                    batch_id,
1392                    created_at: 1_700_000_000,
1393                },
1394            ),
1395            (
1396                awaiting_intent_id,
1397                SendIntentState::AwaitingConfirmation {
1398                    batch_id,
1399                    txid: "txid_mixed".to_string(),
1400                    outpoint: "txid_mixed:1".to_string(),
1401                    fee_contribution_sat: 200,
1402                    created_at: 1_700_000_000,
1403                },
1404            ),
1405        ] {
1406            let intent = SendIntentRecord {
1407                intent_id,
1408                quote_id: format!("q-{}", intent_id),
1409                address: "bcrt1qaddr".to_string(),
1410                amount_sat: 10_000,
1411                max_fee_amount_sat: 500,
1412                tier: PaymentTier::Immediate,
1413                metadata: PaymentMetadata::default(),
1414                state,
1415            };
1416            storage
1417                .create_send_intent_if_absent(&intent)
1418                .await
1419                .expect("store intent");
1420        }
1421
1422        let intents = storage.get_all_send_intents().await.expect("all intents");
1423        assert_eq!(intents.len(), 2);
1424        assert!(intents
1425            .iter()
1426            .any(|intent| matches!(intent.state, SendIntentState::Batched { .. })));
1427        assert!(intents
1428            .iter()
1429            .any(|intent| matches!(intent.state, SendIntentState::AwaitingConfirmation { .. })));
1430    }
1431
1432    // ── Receive saga: serialization round-trip tests ─────────────────
1433
1434    #[test]
1435    fn test_receive_intent_record_state_roundtrip() {
1436        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1437
1438        let state = ReceiveIntentState::Detected {
1439            address: "bcrt1qaddr".to_string(),
1440            txid: "abc123".to_string(),
1441            outpoint: "abc123:0".to_string(),
1442            amount_sat: 50_000,
1443            block_height: 100,
1444            created_at: 1_700_000_000,
1445        };
1446        let json = serde_json::to_string(&state).expect("serialize state");
1447        let deserialized: ReceiveIntentState =
1448            serde_json::from_str(&json).expect("deserialize state");
1449        let json2 = serde_json::to_string(&deserialized).expect("re-serialize");
1450        assert_eq!(json, json2, "Round-trip failed for receive intent state");
1451
1452        // Full intent round-trip
1453        let intent = ReceiveIntentRecord {
1454            intent_id: Uuid::new_v4(),
1455            quote_id: Uuid::new_v4().to_string(),
1456            state,
1457        };
1458        let json = serde_json::to_string(&intent).expect("serialize intent");
1459        let deserialized: ReceiveIntentRecord =
1460            serde_json::from_str(&json).expect("deserialize intent");
1461        assert_eq!(intent.intent_id, deserialized.intent_id);
1462        assert_eq!(intent.quote_id, deserialized.quote_id);
1463    }
1464
1465    #[test]
1466    fn test_finalized_receive_intent_roundtrip() {
1467        let tombstone = FinalizedReceiveIntentRecord {
1468            intent_id: Uuid::new_v4(),
1469            quote_id: Uuid::new_v4().to_string(),
1470            address: "bcrt1qaddr".to_string(),
1471            txid: "abc123".to_string(),
1472            outpoint: "abc123:0".to_string(),
1473            amount_sat: 50_000,
1474            finalized_at: 1_700_000_001,
1475        };
1476        let json = serde_json::to_string(&tombstone).expect("serialize");
1477        let deserialized: FinalizedReceiveIntentRecord =
1478            serde_json::from_str(&json).expect("deserialize");
1479        assert_eq!(tombstone.intent_id, deserialized.intent_id);
1480        assert_eq!(tombstone.quote_id, deserialized.quote_id);
1481        assert_eq!(tombstone.address, deserialized.address);
1482        assert_eq!(tombstone.txid, deserialized.txid);
1483        assert_eq!(tombstone.outpoint, deserialized.outpoint);
1484        assert_eq!(tombstone.amount_sat, deserialized.amount_sat);
1485        assert_eq!(tombstone.finalized_at, deserialized.finalized_at);
1486    }
1487
1488    // ── Receive saga: address index tests ────────────────────────────
1489
1490    #[tokio::test]
1491    async fn test_receive_address_quote_id_index() {
1492        let storage = test_storage().await;
1493
1494        let q1 = Uuid::new_v4().to_string();
1495        let q2 = Uuid::new_v4().to_string();
1496
1497        storage
1498            .track_receive_address("bcrt1qaddr1", &q1)
1499            .await
1500            .expect("track addr1");
1501        storage
1502            .track_receive_address("bcrt1qaddr2", &q2)
1503            .await
1504            .expect("track addr2");
1505
1506        let fetched = storage
1507            .get_quote_id_by_receive_address("bcrt1qaddr1")
1508            .await
1509            .expect("get by address")
1510            .expect("should exist");
1511        assert_eq!(fetched, q1);
1512
1513        let fetched2 = storage
1514            .get_quote_id_by_receive_address("bcrt1qaddr2")
1515            .await
1516            .expect("get by address")
1517            .expect("should exist");
1518        assert_eq!(fetched2, q2);
1519
1520        let missing = storage
1521            .get_quote_id_by_receive_address("unknown")
1522            .await
1523            .expect("get by address");
1524        assert!(missing.is_none());
1525    }
1526
1527    // ── Receive saga: intent CRUD tests ──────────────────────────────
1528
1529    #[tokio::test]
1530    async fn test_receive_intent_crud() {
1531        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1532
1533        let storage = test_storage().await;
1534        let intent_id = Uuid::new_v4();
1535        let quote_id = Uuid::new_v4().to_string();
1536        let intent = ReceiveIntentRecord {
1537            intent_id,
1538            quote_id: quote_id.clone(),
1539            state: ReceiveIntentState::Detected {
1540                address: "bcrt1qaddr".to_string(),
1541                txid: "txid_abc".to_string(),
1542                outpoint: "txid_abc:0".to_string(),
1543                amount_sat: 50_000,
1544                block_height: 100,
1545                created_at: 1_700_000_000,
1546            },
1547        };
1548
1549        // Create
1550        let created = storage
1551            .create_receive_intent_if_absent(&intent)
1552            .await
1553            .expect("create");
1554        assert!(created);
1555
1556        // Get
1557        let fetched = storage
1558            .get_receive_intent(&intent_id)
1559            .await
1560            .expect("get")
1561            .expect("should exist");
1562        assert_eq!(fetched.intent_id, intent_id);
1563        assert_eq!(fetched.quote_id, quote_id);
1564
1565        // Get all
1566        let all = storage.get_all_receive_intents().await.expect("get all");
1567        assert_eq!(all.len(), 1);
1568
1569        // Delete
1570        storage
1571            .delete_receive_intent(&intent_id)
1572            .await
1573            .expect("delete");
1574        let gone = storage.get_receive_intent(&intent_id).await.expect("get");
1575        assert!(gone.is_none());
1576    }
1577
1578    #[tokio::test]
1579    async fn test_receive_intent_duplicate_outpoint_rejection() {
1580        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1581
1582        let storage = test_storage().await;
1583
1584        let intent1 = ReceiveIntentRecord {
1585            intent_id: Uuid::new_v4(),
1586            quote_id: Uuid::new_v4().to_string(),
1587            state: ReceiveIntentState::Detected {
1588                address: "bcrt1qaddr".to_string(),
1589                txid: "txid_abc".to_string(),
1590                outpoint: "txid_abc:0".to_string(),
1591                amount_sat: 50_000,
1592                block_height: 100,
1593                created_at: 1_700_000_000,
1594            },
1595        };
1596
1597        let intent2 = ReceiveIntentRecord {
1598            intent_id: Uuid::new_v4(),
1599            quote_id: Uuid::new_v4().to_string(),
1600            state: ReceiveIntentState::Detected {
1601                address: "bcrt1qaddr".to_string(),
1602                txid: "txid_abc".to_string(),
1603                outpoint: "txid_abc:0".to_string(), // same outpoint
1604                amount_sat: 50_000,
1605                block_height: 100,
1606                created_at: 1_700_000_001,
1607            },
1608        };
1609
1610        let created1 = storage
1611            .create_receive_intent_if_absent(&intent1)
1612            .await
1613            .expect("create first");
1614        assert!(created1);
1615
1616        let created2 = storage
1617            .create_receive_intent_if_absent(&intent2)
1618            .await
1619            .expect("create second (should not error)");
1620        assert!(!created2, "Duplicate outpoint should be rejected");
1621
1622        // Only one intent should exist
1623        let all = storage.get_all_receive_intents().await.expect("get all");
1624        assert_eq!(all.len(), 1);
1625        assert_eq!(all[0].intent_id, intent1.intent_id);
1626    }
1627
1628    #[tokio::test]
1629    async fn test_finalize_receive_intent_atomicity() {
1630        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1631
1632        let storage = test_storage().await;
1633        let intent_id = Uuid::new_v4();
1634        let quote_id = Uuid::new_v4().to_string();
1635
1636        let intent = ReceiveIntentRecord {
1637            intent_id,
1638            quote_id: quote_id.clone(),
1639            state: ReceiveIntentState::Detected {
1640                address: "bcrt1qaddr".to_string(),
1641                txid: "txid_abc".to_string(),
1642                outpoint: "txid_abc:0".to_string(),
1643                amount_sat: 50_000,
1644                block_height: 100,
1645                created_at: 1_700_000_000,
1646            },
1647        };
1648
1649        storage
1650            .create_receive_intent_if_absent(&intent)
1651            .await
1652            .expect("create");
1653
1654        let tombstone = FinalizedReceiveIntentRecord {
1655            intent_id,
1656            quote_id: quote_id.clone(),
1657            address: "bcrt1qaddr".to_string(),
1658            txid: "txid_abc".to_string(),
1659            outpoint: "txid_abc:0".to_string(),
1660            amount_sat: 50_000,
1661            finalized_at: 1_700_000_001,
1662        };
1663
1664        storage
1665            .finalize_receive_intent(&intent_id, &tombstone)
1666            .await
1667            .expect("finalize");
1668
1669        // Active record should be gone
1670        assert!(storage
1671            .get_receive_intent(&intent_id)
1672            .await
1673            .expect("get")
1674            .is_none());
1675
1676        // Tombstone should exist
1677        let fetched_tombstone = storage
1678            .get_finalized_receive_intent(&intent_id)
1679            .await
1680            .expect("get tombstone")
1681            .expect("tombstone should exist");
1682        assert_eq!(fetched_tombstone.intent_id, intent_id);
1683        assert_eq!(fetched_tombstone.amount_sat, 50_000);
1684
1685        // Outpoint should NOT be freed (cannot create a new intent with same outpoint)
1686        let intent2 = ReceiveIntentRecord {
1687            intent_id: Uuid::new_v4(),
1688            quote_id: Uuid::new_v4().to_string(),
1689            state: ReceiveIntentState::Detected {
1690                address: "bcrt1qaddr".to_string(),
1691                txid: "txid_abc".to_string(),
1692                outpoint: "txid_abc:0".to_string(),
1693                amount_sat: 60_000,
1694                block_height: 200,
1695                created_at: 1_700_000_002,
1696            },
1697        };
1698        let created = storage
1699            .create_receive_intent_if_absent(&intent2)
1700            .await
1701            .expect("create after finalization");
1702        assert!(
1703            !created,
1704            "Should NOT be able to create intent after outpoint is finalized"
1705        );
1706
1707        // A DIFFERENT outpoint for the SAME quote ID should be allowed
1708        let intent3 = ReceiveIntentRecord {
1709            intent_id: Uuid::new_v4(),
1710            quote_id: quote_id.clone(),
1711            state: ReceiveIntentState::Detected {
1712                address: "bcrt1qaddr".to_string(),
1713                txid: "txid_abc".to_string(),
1714                outpoint: "txid_abc:1".to_string(), // different vout
1715                amount_sat: 50_000,
1716                block_height: 100,
1717                created_at: 1_700_000_003,
1718            },
1719        };
1720        let created3 = storage
1721            .create_receive_intent_if_absent(&intent3)
1722            .await
1723            .expect("create different outpoint same quote");
1724        assert!(
1725            created3,
1726            "Should be able to create intent for a different outpoint even if same quote ID"
1727        );
1728    }
1729
1730    #[tokio::test]
1731    async fn test_tombstone_query_by_quote_id() {
1732        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1733
1734        let storage = test_storage().await;
1735
1736        // Create and finalize two intents for the same quote id
1737        let shared_quote_id = Uuid::new_v4().to_string();
1738        for (i, outpoint) in ["txid_a:0", "txid_b:1"].iter().enumerate() {
1739            let intent_id = Uuid::new_v4();
1740            let intent = ReceiveIntentRecord {
1741                intent_id,
1742                quote_id: shared_quote_id.to_string(),
1743                state: ReceiveIntentState::Detected {
1744                    address: "bcrt1qshared".to_string(),
1745                    txid: format!("txid_{}", i),
1746                    outpoint: outpoint.to_string(),
1747                    amount_sat: 10_000 * (i as u64 + 1),
1748                    block_height: 100 + i as u32,
1749                    created_at: 1_700_000_000 + i as u64,
1750                },
1751            };
1752            storage
1753                .create_receive_intent_if_absent(&intent)
1754                .await
1755                .expect("create");
1756
1757            let tombstone = FinalizedReceiveIntentRecord {
1758                intent_id,
1759                quote_id: shared_quote_id.to_string(),
1760                address: "bcrt1qshared".to_string(),
1761                txid: format!("txid_{}", i),
1762                outpoint: outpoint.to_string(),
1763                amount_sat: 10_000 * (i as u64 + 1),
1764                finalized_at: 1_700_000_010 + i as u64,
1765            };
1766            storage
1767                .finalize_receive_intent(&intent_id, &tombstone)
1768                .await
1769                .expect("finalize");
1770        }
1771
1772        // Also create and finalize one for a different quote id
1773        let other_id = Uuid::new_v4();
1774        let other_quote_id = Uuid::new_v4().to_string();
1775        let other = ReceiveIntentRecord {
1776            intent_id: other_id,
1777            quote_id: other_quote_id.to_string(),
1778            state: ReceiveIntentState::Detected {
1779                address: "bcrt1qother".to_string(),
1780                txid: "txid_c".to_string(),
1781                outpoint: "txid_c:0".to_string(),
1782                amount_sat: 99_000,
1783                block_height: 300,
1784                created_at: 1_700_000_100,
1785            },
1786        };
1787        storage
1788            .create_receive_intent_if_absent(&other)
1789            .await
1790            .expect("create other");
1791        storage
1792            .finalize_receive_intent(
1793                &other_id,
1794                &FinalizedReceiveIntentRecord {
1795                    intent_id: other_id,
1796                    quote_id: other_quote_id.to_string(),
1797                    address: "bcrt1qother".to_string(),
1798                    txid: "txid_c".to_string(),
1799                    outpoint: "txid_c:0".to_string(),
1800                    amount_sat: 99_000,
1801                    finalized_at: 1_700_000_200,
1802                },
1803            )
1804            .await
1805            .expect("finalize other");
1806
1807        // Query by quote id should return only matching ones
1808        let shared = storage
1809            .get_finalized_receive_intents_by_quote_id(&shared_quote_id)
1810            .await
1811            .expect("query shared");
1812        assert_eq!(shared.len(), 2);
1813        assert!(shared.iter().all(|t| t.quote_id == shared_quote_id));
1814
1815        let other_results = storage
1816            .get_finalized_receive_intents_by_quote_id(&other_quote_id)
1817            .await
1818            .expect("query other");
1819        assert_eq!(other_results.len(), 1);
1820        assert_eq!(other_results[0].amount_sat, 99_000);
1821
1822        // Unknown quote id returns empty
1823        let unknown = storage
1824            .get_finalized_receive_intents_by_quote_id("unknown")
1825            .await
1826            .expect("query unknown");
1827        assert!(unknown.is_empty());
1828    }
1829
1830    /// Regression test for the quote-id index write-skew race that affected
1831    /// `finalize_receive_intent` on Postgres `READ COMMITTED`. SQLite
1832    /// serializes writers via `BEGIN IMMEDIATE`, so this test exercises the
1833    /// new code path rather than reproducing the bug; the structural
1834    /// guarantee (one key per intent, no RMW) is what actually fixes
1835    /// Postgres.
1836    #[tokio::test]
1837    async fn test_finalize_receive_intent_concurrent_same_quote_id() {
1838        use crate::receive::receive_intent::record::{ReceiveIntentRecord, ReceiveIntentState};
1839
1840        let storage = test_storage().await;
1841        let shared_quote_id = Uuid::new_v4().to_string();
1842
1843        // Pre-create two active intents under the same quote id.
1844        let mut intent_ids = Vec::new();
1845        for (i, outpoint) in ["txid_a:0", "txid_b:1"].iter().enumerate() {
1846            let intent_id = Uuid::new_v4();
1847            intent_ids.push((intent_id, outpoint.to_string(), i));
1848            let intent = ReceiveIntentRecord {
1849                intent_id,
1850                quote_id: shared_quote_id.to_string(),
1851                state: ReceiveIntentState::Detected {
1852                    address: "bcrt1qshared".to_string(),
1853                    txid: format!("txid_{}", i),
1854                    outpoint: outpoint.to_string(),
1855                    amount_sat: 10_000 * (i as u64 + 1),
1856                    block_height: 100 + i as u32,
1857                    created_at: 1_700_000_000 + i as u64,
1858                },
1859            };
1860            storage
1861                .create_receive_intent_if_absent(&intent)
1862                .await
1863                .expect("create");
1864        }
1865
1866        // Finalize both concurrently.
1867        let storage_a = storage.clone();
1868        let quote_a = shared_quote_id.clone();
1869        let (intent_a, outpoint_a, i_a) = intent_ids[0].clone();
1870        let task_a = tokio::spawn(async move {
1871            let record = FinalizedReceiveIntentRecord {
1872                intent_id: intent_a,
1873                quote_id: quote_a,
1874                address: "bcrt1qshared".to_string(),
1875                txid: format!("txid_{}", i_a),
1876                outpoint: outpoint_a,
1877                amount_sat: 10_000 * (i_a as u64 + 1),
1878                finalized_at: 1_700_000_010 + i_a as u64,
1879            };
1880            storage_a.finalize_receive_intent(&intent_a, &record).await
1881        });
1882
1883        let storage_b = storage.clone();
1884        let quote_b = shared_quote_id.clone();
1885        let (intent_b, outpoint_b, i_b) = intent_ids[1].clone();
1886        let task_b = tokio::spawn(async move {
1887            let record = FinalizedReceiveIntentRecord {
1888                intent_id: intent_b,
1889                quote_id: quote_b,
1890                address: "bcrt1qshared".to_string(),
1891                txid: format!("txid_{}", i_b),
1892                outpoint: outpoint_b,
1893                amount_sat: 10_000 * (i_b as u64 + 1),
1894                finalized_at: 1_700_000_010 + i_b as u64,
1895            };
1896            storage_b.finalize_receive_intent(&intent_b, &record).await
1897        });
1898
1899        task_a.await.expect("join a").expect("finalize a");
1900        task_b.await.expect("join b").expect("finalize b");
1901
1902        // Both intent_ids must be recoverable from the quote-id index.
1903        let results = storage
1904            .get_finalized_receive_intents_by_quote_id(&shared_quote_id)
1905            .await
1906            .expect("query shared");
1907        assert_eq!(
1908            results.len(),
1909            2,
1910            "both concurrently finalized intents must appear in the quote-id index"
1911        );
1912        let returned_ids: std::collections::HashSet<Uuid> =
1913            results.iter().map(|r| r.intent_id).collect();
1914        assert!(returned_ids.contains(&intent_ids[0].0));
1915        assert!(returned_ids.contains(&intent_ids[1].0));
1916    }
1917}