bdk-cli 3.0.0

An experimental CLI wallet application and playground, powered by BDK
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
use crate::error::BDKCliError as Error;
use crate::handlers::{broadcast_transaction, sync_wallet};
use crate::utils::BlockchainClient;
use bdk_wallet::{
    SignOptions, Wallet,
    bitcoin::{FeeRate, Psbt, Txid, consensus::encode::serialize_hex},
};
use payjoin::bitcoin::TxIn;
use payjoin::persist::{OptionalTransitionOutcome, SessionPersister};
use payjoin::receive::InputPair;
use payjoin::receive::v2::{
    HasReplyableError, Initialized, MaybeInputsOwned, MaybeInputsSeen, Monitor, OutputsUnknown,
    PayjoinProposal, ProvisionalProposal, ReceiveSession, Receiver,
    SessionEvent as ReceiverSessionEvent, UncheckedOriginalPayload, WantsFeeRange, WantsInputs,
    WantsOutputs,
};
use payjoin::send::v2::{
    PollingForProposal, SendSession, Sender, SessionEvent as SenderSessionEvent,
    SessionOutcome as SenderSessionOutcome, WithReplyKey,
};
use payjoin::{ImplementationError, UriExt};
use serde_json::{json, to_string_pretty};
use std::sync::{Arc, Mutex};

use crate::payjoin::ohttp::{RelayManager, fetch_ohttp_keys};

pub mod ohttp;

/// Implements all of the functions required to go through the Payjoin receive and send processes.
///
/// TODO: At the time of writing, this struct is written to make a Persister implementation easier
/// but the persister is not implemented yet! For instance [`PayjoinManager::proceed_sender_session`] and
/// [`PayjoinManager::proceed_receiver_session`] are designed such that the manager can enable
/// resuming ongoing payjoins are well. So... this is a TODO for implementing persister.
pub(crate) struct PayjoinManager<'a> {
    wallet: &'a mut Wallet,
    relay_manager: Arc<Mutex<RelayManager>>,
}

impl<'a> PayjoinManager<'a> {
    pub fn new(wallet: &'a mut Wallet, relay_manager: Arc<Mutex<RelayManager>>) -> Self {
        Self {
            wallet,
            relay_manager,
        }
    }

    pub async fn receive_payjoin(
        &mut self,
        amount: u64,
        directory: String,
        max_fee_rate: Option<u64>,
        ohttp_relays: Vec<String>,
        blockchain_client: &BlockchainClient,
    ) -> Result<String, Error> {
        let address = self
            .wallet
            .next_unused_address(bdk_wallet::KeychainKind::External);

        let ohttp_relays: Vec<url::Url> = ohttp_relays
            .into_iter()
            .map(|s| url::Url::parse(&s))
            .collect::<Result<_, _>>()
            .map_err(|e| Error::Generic(format!("Failed to parse one or more OHTTP URLs: {e}")))?;

        if ohttp_relays.is_empty() {
            return Err(Error::Generic(
                "At least one valid OHTTP relay must be provided.".into(),
            ));
        }

        let ohttp_keys =
            fetch_ohttp_keys(ohttp_relays, &directory, self.relay_manager.clone()).await?;
        // TODO: Implement proper persister.
        let persister = payjoin::persist::NoopSessionPersister::<ReceiverSessionEvent>::default();

        let checked_max_fee_rate = max_fee_rate
            .map(FeeRate::from_sat_per_kwu)
            .unwrap_or(FeeRate::BROADCAST_MIN);

        let receiver = payjoin::receive::v2::ReceiverBuilder::new(
            address.address,
            directory,
            ohttp_keys.ohttp_keys,
        )?
        .with_amount(payjoin::bitcoin::Amount::from_sat(amount))
        .with_max_fee_rate(checked_max_fee_rate)
        .build()
        .save(&persister)
        .map_err(|e| {
            Error::Generic(format!(
                "Failed to persister the receiver after initialization: {e}"
            ))
        })?;

        let pj_uri = receiver.pj_uri();
        println!("Request Payjoin by sharing this Payjoin Uri:");
        println!("{pj_uri}");

        self.proceed_receiver_session(
            ReceiveSession::Initialized(receiver.clone()),
            &persister,
            ohttp_keys.relay_url.to_string(),
            checked_max_fee_rate,
            blockchain_client,
        )
        .await?;

        Ok(to_string_pretty(&json!({}))?)
    }

    pub async fn send_payjoin(
        &mut self,
        uri: String,
        fee_rate: u64,
        ohttp_relays: Vec<String>,
        blockchain_client: &BlockchainClient,
    ) -> Result<String, Error> {
        let uri = payjoin::Uri::try_from(uri)
            .map_err(|e| Error::Generic(format!("Failed parsing to Payjoin URI: {}", e)))?;
        let uri = uri.require_network(self.wallet.network()).map_err(|e| {
            Error::Generic(format!("Failed setting the right network for the URI: {e}"))
        })?;
        let uri = uri
            .check_pj_supported()
            .map_err(|e| Error::Generic(format!("URI does not support Payjoin: {}", e)))?;

        let sats = uri
            .amount
            .ok_or_else(|| Error::Generic("Amount is not specified in the URI.".to_string()))?;

        let fee_rate = FeeRate::from_sat_per_vb(fee_rate).expect("Provided fee rate is not valid.");

        // Build and sign the original PSBT which pays to the receiver.
        let mut original_psbt = {
            let mut tx_builder = self.wallet.build_tx();
            tx_builder
                .add_recipient(uri.address.script_pubkey(), sats)
                .fee_rate(fee_rate);

            tx_builder.finish()?
        };
        if !self
            .wallet
            .sign(&mut original_psbt, SignOptions::default())?
        {
            return Err(Error::Generic(
                "Failed to sign and finalize the original PSBT.".to_string(),
            ));
        }

        let txid = match uri.extras.pj_param() {
            payjoin::PjParam::V1(_) => {
                let (req, ctx) = payjoin::send::v1::SenderBuilder::new(original_psbt.clone(), uri)
                    .build_recommended(fee_rate)?
                    .create_v1_post_request();

                let response = self.send_payjoin_post_request(req).await?;
                let psbt = ctx.process_response(&response.bytes().await?)?;

                self.process_payjoin_proposal(psbt, blockchain_client)
                    .await?
            }
            payjoin::PjParam::V2(_) => {
                let ohttp_relays: Vec<url::Url> = ohttp_relays
                    .into_iter()
                    .map(|s| url::Url::parse(&s))
                    .collect::<Result<_, _>>()
                    .map_err(|e| {
                        Error::Generic(format!("Failed to parse one or more OHTTP URLs: {e}"))
                    })?;

                if ohttp_relays.is_empty() {
                    return Err(Error::Generic(
                        "At least one valid OHTTP relay must be provided.".into(),
                    ));
                }

                // TODO: Implement proper persister.
                let persister =
                    payjoin::persist::NoopSessionPersister::<SenderSessionEvent>::default();

                let sender = payjoin::send::v2::SenderBuilder::new(original_psbt.clone(), uri)
                    .build_recommended(fee_rate)?
                    .save(&persister)
                    .map_err(|e| {
                        Error::Generic(format!(
                            "Failed to save the Payjoin v2 sender in the persister: {e}"
                        ))
                    })?;

                let selected_relay =
                    fetch_ohttp_keys(ohttp_relays, &sender.endpoint(), self.relay_manager.clone())
                        .await?
                        .relay_url;

                self.proceed_sender_session(
                    SendSession::WithReplyKey(sender),
                    &persister,
                    selected_relay.to_string(),
                    blockchain_client,
                )
                .await?
            }
            _ => {
                unimplemented!("Payjoin version not recognized.");
            }
        };

        Ok(to_string_pretty(&json!({ "txid": txid }))?)
    }

    async fn proceed_receiver_session(
        &mut self,
        session: ReceiveSession,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        relay: impl payjoin::IntoUrl,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        match session {
            ReceiveSession::Initialized(proposal) => {
                self.read_from_directory(
                    proposal,
                    persister,
                    relay,
                    max_fee_rate,
                    blockchain_client,
                )
                .await
            }
            ReceiveSession::UncheckedOriginalPayload(proposal) => {
                self.check_proposal(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::MaybeInputsOwned(proposal) => {
                self.check_inputs_not_owned(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::MaybeInputsSeen(proposal) => {
                self.check_no_inputs_seen_before(
                    proposal,
                    persister,
                    max_fee_rate,
                    blockchain_client,
                )
                .await
            }
            ReceiveSession::OutputsUnknown(proposal) => {
                self.identify_receiver_outputs(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::WantsOutputs(proposal) => {
                self.commit_outputs(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::WantsInputs(proposal) => {
                self.contribute_inputs(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::WantsFeeRange(proposal) => {
                self.apply_fee_range(proposal, persister, max_fee_rate, blockchain_client)
                    .await
            }
            ReceiveSession::ProvisionalProposal(proposal) => {
                self.finalize_proposal(proposal, persister, blockchain_client)
                    .await
            }
            ReceiveSession::PayjoinProposal(proposal) => {
                self.send_payjoin_proposal(proposal, persister, blockchain_client)
                    .await
            }
            ReceiveSession::Monitor(proposal) => {
                self.monitor_payjoin_proposal(proposal, persister, blockchain_client)
                    .await
            }
            ReceiveSession::HasReplyableError(error) => self.handle_error(error, persister).await,
            ReceiveSession::Closed(_) => Err(Error::Generic("Session closed".to_string())),
        }
    }

    async fn read_from_directory(
        &mut self,
        receiver: Receiver<Initialized>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        relay: impl payjoin::IntoUrl,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let mut current_receiver_typestate = receiver;
        let next_receiver_typestate = loop {
            let (req, context) = current_receiver_typestate.create_poll_request(relay.as_str())?;
            let response = self.send_payjoin_post_request(req).await?;
            let state_transition = current_receiver_typestate
                .process_response(response.bytes().await?.to_vec().as_slice(), context)
                .save(persister);
            match state_transition {
                Ok(OptionalTransitionOutcome::Progress(next_state)) => {
                    println!("Got a request from the sender. Responding with a Payjoin proposal.");
                    break next_state;
                }
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
                    current_receiver_typestate = current_state;
                    continue;
                }
                Err(e) => {
                    return Err(Error::Generic(format!(
                        "Error occurred when polling for Payjoin proposal from the directory: {e}"
                    )));
                }
            }
        };
        self.check_proposal(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn check_proposal(
        &mut self,
        receiver: Receiver<UncheckedOriginalPayload>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let next_receiver_typestate = receiver
            .assume_interactive_receiver()
            .save(persister)
            .map_err(|e| {
                Error::Generic(format!(
                    "Error occurred when saving after assuming interactive receiver and not checking proposal broadcastability: {e}"
                ))
            })?;

        println!(
            "Checking whether the original proposal can be broadcasted itself is not supported. If the Payjoin fails, manually fall back to the transaction below."
        );
        println!(
            "{}",
            serialize_hex(&next_receiver_typestate.extract_tx_to_schedule_broadcast())
        );

        self.check_inputs_not_owned(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn check_inputs_not_owned(
        &mut self,
        receiver: Receiver<MaybeInputsOwned>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let next_receiver_typestate = receiver
            .check_inputs_not_owned(&mut |input| {
                Ok(self.wallet.is_mine(input.to_owned()))
            })
            .save(persister)
            .map_err(|e| {
                Error::Generic(format!("Error occurred when saving after checking if inputs in the original proposal are not owned: {e}"))
            })?;

        self.check_no_inputs_seen_before(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn check_no_inputs_seen_before(
        &mut self,
        receiver: Receiver<MaybeInputsSeen>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        // This is not supported as there is no persistence of previous Payjoin attempts in BDK CLI
        // yet. If there is support either in the BDK persister or Payjoin persister, this can be
        // implemented, but it is not a concern as the use cases of the CLI does not warrant
        // protection against probing attacks.
        println!(
            "Checking whether the inputs in the proposal were seen before to protect from probing attacks is not supported. Skipping the check..."
        );
        let next_receiver_typestate = receiver.check_no_inputs_seen_before(&mut |_| Ok(false)).save(persister).map_err(|e| {
            Error::Generic(format!("Error occurred when saving after checking if the inputs in the proposal were seen before: {e}"))
        })?;
        self.identify_receiver_outputs(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn identify_receiver_outputs(
        &mut self,
        receiver: Receiver<OutputsUnknown>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let next_receiver_typestate = receiver.identify_receiver_outputs(&mut |output_script| {
            Ok(self.wallet.is_mine(output_script.to_owned()))
        }).save(persister).map_err(|e| {
            Error::Generic(format!("Error occurred when saving after checking if the outputs in the original proposal are owned by the receiver: {e}"))
        })?;

        self.commit_outputs(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn commit_outputs(
        &mut self,
        receiver: Receiver<WantsOutputs>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        // This is a typestate to modify existing receiver-owned outputs in case the receiver wants
        // to do that. This is a very simple implementation of Payjoin so we are just going
        // to commit to the existing outputs which the sender included in the original proposal.
        let next_receiver_typestate = receiver.commit_outputs().save(persister).map_err(|e| {
            Error::Generic(format!(
                "Error occurred when saving after committing to the outputs in the proposal: {e}"
            ))
        })?;
        self.contribute_inputs(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn contribute_inputs(
        &mut self,
        receiver: Receiver<WantsInputs>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let candidate_inputs: Vec<InputPair> = self
            .wallet
            .list_unspent()
            .map(|output| {
                let psbtin = self
                    .wallet
                    .get_psbt_input(output.clone(), None, false)
                    .expect(
                        "Failed to get the PSBT Input using the output of the unspent transaction",
                    );
                let txin = TxIn {
                    previous_output: output.outpoint,
                    ..Default::default()
                };
                InputPair::new(txin, psbtin, None)
                    .expect("Failed to create InputPair when contributing outputs to the proposal")
            })
            .collect();
        let selected_input = receiver.try_preserving_privacy(candidate_inputs)?;

        let next_receiver_typestate = receiver.contribute_inputs(vec![selected_input])?
            .commit_inputs().save(persister)
            .map_err(|e| {
                Error::Generic(format!("Error occurred when saving after committing to the inputs after receiver contribution: {e}"))
            })?;

        self.apply_fee_range(
            next_receiver_typestate,
            persister,
            max_fee_rate,
            blockchain_client,
        )
        .await
    }

    async fn apply_fee_range(
        &mut self,
        receiver: Receiver<WantsFeeRange>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        max_fee_rate: FeeRate,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let next_receiver_typestate = receiver.apply_fee_range(None, Some(max_fee_rate)).save(persister).map_err(|e| {
            Error::Generic(format!("Error occurred when saving after applying the receiver fee range to the transaction: {e}"))
        })?;
        self.finalize_proposal(next_receiver_typestate, persister, blockchain_client)
            .await
    }

    async fn finalize_proposal(
        &mut self,
        receiver: Receiver<ProvisionalProposal>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let next_receiver_typestate = receiver
            .finalize_proposal(|psbt| {
                let mut psbt_clone = psbt.clone();

                // We cannot finalize the transaction for broadcasting as it does not have the
                // sender signatures yet. Hence, we only care about whether this returns an Err.
                let _ = !self
                    .wallet
                    .sign(&mut psbt_clone, SignOptions::default())
                    .map_err(|e| {
                        ImplementationError::from(
                            format!("Error occurred when signing the Payjoin PSBT: {e}").as_str(),
                        )
                    })?;

                Ok(psbt_clone)
            })
            .save(persister)
            .map_err(|e| {
                Error::Generic(format!(
                    "Error occurred when saving after signing the Payjoin proposal: {e}"
                ))
            })?;

        self.send_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
            .await
    }

    async fn send_payjoin_proposal(
        &mut self,
        receiver: Receiver<PayjoinProposal>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let (req, ctx) = receiver.create_post_request(
            self.relay_manager
                .lock()
                .expect("Lock should not be poisoned")
                .get_selected_relay()
                .expect("A relay should already be selected")
                .as_str(),
        ).map_err(|e| {
                Error::Generic(format!("Error occurred when creating a post request for sending final Payjoin proposal: {e}"))
            })?;

        let res = self.send_payjoin_post_request(req).await?;
        let payjoin_psbt = receiver.psbt().clone();
        let next_receiver_typestate = receiver.process_response(&res.bytes().await?, ctx).save(persister).map_err(|e| {
            Error::Generic(format!("Error occurred when saving after processing the response to the Payjoin proposal send: {e}"))
        })?;
        println!(
            "Response successful. TXID: {}",
            payjoin_psbt.extract_tx_unchecked_fee_rate().compute_txid()
        );
        return self
            .monitor_payjoin_proposal(next_receiver_typestate, persister, blockchain_client)
            .await;
    }

    /// Polls the blockchain periodically and checks whether the Payjoin was broadcasted by the
    /// sender.
    ///
    /// This function syncs the wallet at regular intervals and checks for the Payjoin transaction
    /// in a loop until it is detected or a timeout is reached. Since [`sync_wallet`] now accepts
    /// a reference to the BlockchainClient, we can call it multiple times in a loop.
    async fn monitor_payjoin_proposal(
        &mut self,
        receiver: Receiver<Monitor>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
        blockchain_client: &BlockchainClient,
    ) -> Result<(), Error> {
        let poll_interval = tokio::time::Duration::from_millis(200);
        let sync_interval = tokio::time::Duration::from_secs(3);
        let timeout_duration = tokio::time::Duration::from_secs(15);

        println!(
            "Polling for Payjoin transaction broadcast. This may take up to {} seconds...",
            timeout_duration.as_secs()
        );
        let result = tokio::time::timeout(timeout_duration, async {
            let mut poll_timer = tokio::time::interval(poll_interval);
            let mut sync_timer = tokio::time::interval(sync_interval);
            poll_timer.tick().await;
            sync_timer.tick().await;
            sync_wallet(blockchain_client, self.wallet).await?;

            loop {
                tokio::select! {
                    _ = poll_timer.tick() => {
                        // Time to check payment
                        let check_result = receiver
                            .check_payment(
                                |txid| {
                                    let Some(tx_details) = self.wallet.tx_details(txid) else {
                                        return Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"));
                                    };

                                    let is_seen = matches!(tx_details.chain_position, bdk_wallet::chain::ChainPosition::Confirmed { .. } | bdk_wallet::chain::ChainPosition::Unconfirmed { first_seen: Some(_), .. });

                                    if is_seen {
                                        return Ok(Some(tx_details.tx.as_ref().clone()));
                                    }
                                Err(ImplementationError::from("Cannot find the transaction in the mempool or the blockchain"))
                                },
                                |outpoint| {
                                    let utxo = self.wallet.get_utxo(outpoint);
                                    match utxo {
                                        Some(_) => Ok(false),
                                        None => Ok(true),
                                    }
                                }
                            )
                            .save(persister)
                            .map_err(|e| {
                                Error::Generic(format!("Error occurred when saving after checking that sender has broadcasted the Payjoin transaction: {e}"))
                            });

                        if let Ok(OptionalTransitionOutcome::Progress(_)) = check_result {
                            println!("Payjoin transaction detected in the mempool!");
                            return Ok(());
                        }
                        // For Stasis or Err, continue polling (implicit - falls through to next loop iteration)
                    }
                    _ = sync_timer.tick() => {
                        // Time to sync wallet
                        sync_wallet(blockchain_client, self.wallet).await?;
                    }
                }
            }
        })
        .await;

        match result {
            Ok(ok) => ok,
            Err(_) => Err(Error::Generic(format!(
                "Timeout waiting for Payjoin transaction broadcast after {:?}. Check the state of the transaction manually after running the sync command.",
                timeout_duration
            ))),
        }
    }

    async fn handle_error(
        &self,
        receiver: Receiver<HasReplyableError>,
        persister: &impl SessionPersister<SessionEvent = ReceiverSessionEvent>,
    ) -> Result<(), Error> {
        let (err_req, err_ctx) = receiver
            .create_error_request(
                self.relay_manager
                    .lock()
                    .expect("Lock should not be poisoned")
                    .get_selected_relay()
                    .expect("A relay should already be selected")
                    .as_str(),
            )
            .map_err(|e| {
                Error::Generic(format!(
                    "Error occurred when creating a receiver error request: {}",
                    e
                ))
            })?;

        let err_response = match self.send_payjoin_post_request(err_req).await {
            Ok(response) => response,
            Err(e) => {
                return Err(Error::Generic(format!(
                    "Failed to post error request: {}",
                    e
                )));
            }
        };

        let err_bytes = match err_response.bytes().await {
            Ok(bytes) => bytes,
            Err(e) => {
                return Err(Error::Generic(format!(
                    "Failed to get error response bytes: {}",
                    e
                )));
            }
        };

        if let Err(e) = receiver
            .process_error_response(&err_bytes, err_ctx)
            .save(persister)
        {
            return Err(Error::Generic(format!(
                "Failed to process error response: {}",
                e
            )));
        }

        Ok(())
    }

    async fn proceed_sender_session(
        &self,
        session: SendSession,
        persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
        relay: impl payjoin::IntoUrl,
        blockchain_client: &BlockchainClient,
    ) -> Result<Txid, Error> {
        match session {
            SendSession::WithReplyKey(context) => {
                self.post_original_proposal(context, relay, persister, blockchain_client)
                    .await
            }
            SendSession::PollingForProposal(context) => {
                self.get_proposed_payjoin_proposal(context, relay, persister, blockchain_client)
                    .await
            }
            SendSession::Closed(SenderSessionOutcome::Success(psbt)) => {
                self.process_payjoin_proposal(psbt, blockchain_client).await
            }
            _ => Err(Error::Generic("Unexpected SendSession state!".to_string())),
        }
    }

    async fn post_original_proposal(
        &self,
        sender: Sender<WithReplyKey>,
        relay: impl payjoin::IntoUrl,
        persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
        blockchain_client: &BlockchainClient,
    ) -> Result<Txid, Error> {
        let (req, ctx) = sender.create_v2_post_request(relay.as_str())?;
        let response = self.send_payjoin_post_request(req).await?;
        let sender = sender
            .process_response(&response.bytes().await?, ctx)
            .save(persister)
        .map_err(|e| {
                Error::Generic(format!("Failed to persist the Payjoin send after successfully sending original proposal: {e}"))
            })?;
        self.get_proposed_payjoin_proposal(sender, relay, persister, blockchain_client)
            .await
    }

    async fn get_proposed_payjoin_proposal(
        &self,
        sender: Sender<PollingForProposal>,
        relay: impl payjoin::IntoUrl,
        persister: &impl SessionPersister<SessionEvent = SenderSessionEvent>,
        blockchain_client: &BlockchainClient,
    ) -> Result<Txid, Error> {
        let mut sender = sender.clone();
        loop {
            let (req, ctx) = sender.create_poll_request(relay.as_str())?;
            let response = self.send_payjoin_post_request(req).await?;
            let processed_response = sender
                .process_response(&response.bytes().await?, ctx)
                .save(persister);
            match processed_response {
                Ok(OptionalTransitionOutcome::Progress(psbt)) => {
                    println!("Proposal received. Processing...");
                    return self.process_payjoin_proposal(psbt, blockchain_client).await;
                }
                Ok(OptionalTransitionOutcome::Stasis(current_state)) => {
                    println!("No response yet. Continuing polling...");
                    sender = current_state;
                    continue;
                }
                Err(e) => {
                    break Err(Error::Generic(format!(
                        "Error occurred when polling for Payjoin v2 proposal: {e}"
                    )));
                }
            }
        }
    }

    async fn process_payjoin_proposal(
        &self,
        mut psbt: Psbt,
        blockchain_client: &BlockchainClient,
    ) -> Result<Txid, Error> {
        if !self.wallet.sign(&mut psbt, SignOptions::default())? {
            return Err(Error::Generic(
                "Failed to sign and finalize the Payjoin proposal PSBT.".to_string(),
            ));
        }

        broadcast_transaction(blockchain_client, psbt.extract_tx_fee_rate_limit()?).await
    }

    async fn send_payjoin_post_request(
        &self,
        req: payjoin::Request,
    ) -> reqwest::Result<reqwest::Response> {
        let client = reqwest::Client::new();
        client
            .post(req.url)
            .header("Content-Type", req.content_type)
            .body(req.body)
            .send()
            .await
    }
}