1use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::join_all;
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::movement::manager::OnDropStatus;
37use crate::{Wallet, WalletVtxo, SECP, SUBSCRIBE_REQUEST_TIMEOUT};
38use crate::movement::{MovementId, MovementStatus};
39use crate::movement::update::MovementUpdate;
40use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
41
42const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
45use crate::subsystem::{RoundMovement, Subsystem};
46
47
48const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RoundParticipation {
54 #[serde(with = "ark::encode::serde::vec")]
55 pub inputs: Vec<Vtxo<Full>>,
56 pub outputs: Vec<VtxoRequest>,
59 #[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
61 pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
62}
63
64impl RoundParticipation {
65 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
66 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
67 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
68 let fee = input_amount - output_amount;
69 Ok(MovementUpdate::new()
70 .consumed_vtxos(&self.inputs)
71 .intended_balance(SignedAmount::ZERO)
72 .effective_balance( - fee.to_signed()?)
73 .fee(fee)
74 )
75 }
76}
77
78#[derive(Debug, Clone)]
79pub enum RoundStatus {
80 Confirmed {
82 funding_txid: Txid,
83 },
84 Unconfirmed {
86 funding_txid: Txid,
87 },
88 Pending,
90 Failed {
92 error: String,
93 },
94 Canceled,
96}
97
98impl RoundStatus {
99 pub fn is_final(&self) -> bool {
101 match self {
102 Self::Confirmed { .. } => true,
103 Self::Unconfirmed { .. } => false,
104 Self::Pending => false,
105 Self::Failed { .. } => true,
106 Self::Canceled => true,
107 }
108 }
109
110 pub fn is_success(&self) -> bool {
112 match self {
113 Self::Confirmed { .. } => true,
114 Self::Unconfirmed { .. } => true,
115 Self::Pending => false,
116 Self::Failed { .. } => false,
117 Self::Canceled => false,
118 }
119 }
120}
121
122pub struct RoundState {
135 pub(crate) done: bool,
137
138 pub(crate) participation: RoundParticipation,
140
141 pub(crate) flow: RoundFlowState,
143
144 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
149
150 pub(crate) sent_forfeit_sigs: bool,
157
158 pub(crate) movement_id: Option<MovementId>,
160}
161
162impl RoundState {
163 fn new_interactive(
164 participation: RoundParticipation,
165 movement_id: Option<MovementId>,
166 ) -> Self {
167 Self {
168 participation,
169 movement_id,
170 flow: RoundFlowState::InteractivePending,
171 new_vtxos: Vec::new(),
172 sent_forfeit_sigs: false,
173 done: false,
174 }
175 }
176
177 fn new_delegated(
178 participation: RoundParticipation,
179 unlock_hash: UnlockHash,
180 movement_id: Option<MovementId>,
181 ) -> Self {
182 Self {
183 participation,
184 movement_id,
185 flow: RoundFlowState::NonInteractivePending { unlock_hash },
186 new_vtxos: Vec::new(),
187 sent_forfeit_sigs: false,
188 done: false,
189 }
190 }
191
192 pub fn participation(&self) -> &RoundParticipation {
194 &self.participation
195 }
196
197 pub fn unlock_hash(&self) -> Option<UnlockHash> {
199 match self.flow {
200 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201 RoundFlowState::InteractivePending => None,
202 RoundFlowState::InteractiveOngoing { .. } => None,
203 RoundFlowState::Failed { .. } => None,
204 RoundFlowState::Canceled => None,
205 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206 }
207 }
208
209 pub fn funding_tx(&self) -> Option<&Transaction> {
210 match self.flow {
211 RoundFlowState::NonInteractivePending { .. } => None,
212 RoundFlowState::InteractivePending => None,
213 RoundFlowState::InteractiveOngoing { .. } => None,
214 RoundFlowState::Failed { .. } => None,
215 RoundFlowState::Canceled => None,
216 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217 }
218 }
219
220 pub fn ongoing_participation(&self) -> bool {
222 match self.flow {
223 RoundFlowState::NonInteractivePending { .. } => false,
224 RoundFlowState::InteractivePending => true,
225 RoundFlowState::InteractiveOngoing { .. } => true,
226 RoundFlowState::Failed { .. } => false,
227 RoundFlowState::Canceled => false,
228 RoundFlowState::Finished { .. } => false,
229 }
230 }
231
232 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235 let ret = match self.flow {
236 RoundFlowState::NonInteractivePending { .. } => {
237 bail!("it is currently not yet possible to cancel pending delegated rounds");
239 },
240 RoundFlowState::Canceled => true,
241 RoundFlowState::Failed { .. } => true,
242 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
243 self.flow = RoundFlowState::Canceled;
244 true
245 },
246 RoundFlowState::Finished { .. } => false,
247 };
248 if ret {
249 persist_round_failure(wallet, &self.participation, self.movement_id).await
250 .context("failed to persist round failure for cancelation")?;
251 }
252 Ok(ret)
253 }
254
255 async fn try_start_attempt(
256 &mut self,
257 wallet: &Wallet,
258 attempt: &RoundAttempt,
259 ) {
260 if let RoundFlowState::InteractiveOngoing {
263 state: AttemptState::AwaitingUnsignedVtxoTree { ref cosign_keys, .. },
264 ..
265 } = self.flow {
266 if let Some(k) = cosign_keys.first() {
267 wallet.inner.round_secret_nonces.forget(&k.public_key());
268 }
269 }
270
271 match start_attempt(wallet, &self.participation, attempt).await {
272 Ok(state) => {
273 self.flow = RoundFlowState::InteractiveOngoing {
274 round_seq: attempt.round_seq,
275 attempt_seq: attempt.attempt_seq,
276 state: state,
277 };
278 },
279 Err(e) => {
280 self.flow = RoundFlowState::Failed {
281 error: format!("{:#}", e),
282 };
283 },
284 }
285 }
286
287 pub async fn process_event(
289 &mut self,
290 wallet: &Wallet,
291 event: &RoundEvent,
292 ) -> bool {
293 let _: Infallible = match self.flow {
294 RoundFlowState::InteractivePending => {
295 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
296 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
297 self.try_start_attempt(wallet, e).await;
298 return true;
299 } else {
300 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
301 event.kind(), event.round_seq(), event.attempt_seq(),
302 );
303 return false;
304 }
305 },
306 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
307 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
310 warn!("Round {} failed by server", round_seq);
311 self.flow = RoundFlowState::Failed {
312 error: format!("round {} failed by server", round_seq),
313 };
314 return true;
315 }
316
317 if event.round_seq() > round_seq {
318 self.flow = RoundFlowState::Failed {
321 error: format!("round {} started while we were on {}",
322 event.round_seq(), round_seq,
323 ),
324 };
325 return true;
326 }
327
328 if event.attempt_seq() < attempt_seq {
329 trace!("ignoring replayed message from old attempt");
330 return false;
331 }
332
333 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
334 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
335 self.try_start_attempt(wallet, e).await;
336 return true;
337 }
338 trace!("Processing event {} for round attempt {}:{} in state {}",
339 event.kind(), round_seq, attempt_seq, state.kind(),
340 );
341
342 return match progress_attempt(state, wallet, &self.participation, event).await {
343 AttemptProgressResult::NotUpdated => false,
344 AttemptProgressResult::Updated { new_state } => {
345 *state = new_state;
346 true
347 },
348 AttemptProgressResult::Failed(e) => {
349 warn!("Round failed with error: {:#}", e);
350 self.flow = RoundFlowState::Failed {
351 error: format!("{:#}", e),
352 };
353 true
354 },
355 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
356 self.new_vtxos = vtxos;
357 let funding_txid = funding_tx.compute_txid();
358 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
359 if let Some(mid) = self.movement_id {
360 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
361 warn!("Error updating the round funding txid: {:#}", e);
362 }
363 }
364 true
365 },
366 };
367 },
368 RoundFlowState::NonInteractivePending { .. }
369 | RoundFlowState::Finished { .. }
370 | RoundFlowState::Failed { .. }
371 | RoundFlowState::Canceled => return false,
372 };
373 }
374
375 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
380 match self.flow {
381 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
382 Ok(RoundStatus::Confirmed {
383 funding_txid: funding_tx.compute_txid(),
384 })
385 },
386
387 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
388 Ok(RoundStatus::Pending)
389 },
390 RoundFlowState::Failed { ref error } => {
391 persist_round_failure(wallet, &self.participation, self.movement_id).await
392 .context("failed to persist round failure")?;
393 Ok(RoundStatus::Failed { error: error.clone() })
394 },
395 RoundFlowState::Canceled => {
396 persist_round_failure(wallet, &self.participation, self.movement_id).await
397 .context("failed to persist round failure")?;
398 Ok(RoundStatus::Canceled)
399 },
400
401 RoundFlowState::NonInteractivePending { unlock_hash } => {
402 match progress_delegated(wallet, &self.participation, unlock_hash).await {
403 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
404 Ok(HarkProgressResult::RoundNotFound) => {
405 self.handle_round_not_found(wallet).await
406 },
407 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
408 let funding_txid = funding_tx.compute_txid();
409 self.new_vtxos = new_vtxos;
410 self.flow = RoundFlowState::Finished {
411 funding_tx: funding_tx.clone(),
412 unlock_hash: unlock_hash,
413 };
414
415 persist_round_success(
416 wallet,
417 &self.participation,
418 self.movement_id,
419 &self.new_vtxos,
420 &funding_tx,
421 ).await.context("failed to store successful round in DB!")?;
422
423 self.done = true;
424
425 Ok(RoundStatus::Confirmed { funding_txid })
426 },
427 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
428 if let Some(mid) = self.movement_id {
429 update_funding_txid(wallet, mid, funding_txid).await
430 .context("failed to update funding txid in DB")?;
431 }
432 Ok(RoundStatus::Unconfirmed { funding_txid })
433 },
434
435 Err(HarkForfeitError::Err(e)) => {
438 Err(e.context("error progressing delegated round"))
442 },
443 Err(HarkForfeitError::SentForfeits(e)) => {
444 self.sent_forfeit_sigs = true;
445 Err(e.context("error progressing delegated round \
446 after sending forfeit tx signatures"))
447 },
448 }
449 },
450 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
452 let funding_txid = funding_tx.compute_txid();
453 let confirmed = check_funding_tx_confirmations(
454 wallet, funding_txid, &funding_tx,
455 ).await.context("error checking funding tx confirmations")?;
456 if !confirmed {
457 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
458 return Ok(RoundStatus::Unconfirmed { funding_txid });
459 }
460
461 match hark_vtxo_swap(
462 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
463 ).await {
464 Ok(()) => {
465 persist_round_success(
466 wallet,
467 &self.participation,
468 self.movement_id,
469 &self.new_vtxos,
470 &funding_tx,
471 ).await.context("failed to store successful round in DB!")?;
472
473 self.done = true;
474
475 Ok(RoundStatus::Confirmed { funding_txid })
476 },
477 Err(HarkForfeitError::Err(e)) => {
478 Err(e.context("error forfeiting VTXOs after round"))
479 },
480 Err(HarkForfeitError::SentForfeits(e)) => {
481 self.sent_forfeit_sigs = true;
482 Err(e.context("error after having signed and sent \
483 forfeit signatures to server"))
484 },
485 }
486 },
487 }
488 }
489
490 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
493 if self.new_vtxos.is_empty() {
494 None
495 } else {
496 Some(&self.new_vtxos)
497 }
498 }
499
500 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
503 match self.flow {
505 RoundFlowState::NonInteractivePending { .. }
506 | RoundFlowState::InteractivePending
507 | RoundFlowState::InteractiveOngoing { .. }
508 => {
509 &self.participation.inputs
510 },
511 RoundFlowState::Finished { .. } => if self.done {
512 &[]
514 } else {
515 &self.participation.inputs
516 },
517 RoundFlowState::Failed { .. }
518 | RoundFlowState::Canceled
519 => {
520 &[]
522 },
523 }
524 }
525
526 pub fn pending_balance(&self) -> Amount {
530 if self.done {
531 return Amount::ZERO;
532 }
533
534 match self.flow {
535 RoundFlowState::NonInteractivePending { .. }
536 | RoundFlowState::InteractivePending
537 | RoundFlowState::InteractiveOngoing { .. }
538 | RoundFlowState::Finished { .. }
539 => {
540 self.participation.outputs.iter().map(|o| o.amount).sum()
541 },
542 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
543 Amount::ZERO
544 },
545 }
546 }
547
548 async fn handle_round_not_found(
555 &mut self,
556 wallet: &Wallet,
557 ) -> anyhow::Result<RoundStatus> {
558 info!("Server reports round participation not found (no forfeits sent)");
559 self.flow = RoundFlowState::Failed {
560 error: "server reports round participation not found".into(),
561 };
562 persist_round_failure(wallet, &self.participation, self.movement_id).await
563 .context("failed to persist round failure")?;
564
565 Ok(RoundStatus::Failed {
566 error: "server reports round participation not found".into(),
567 })
568 }
569}
570
571pub enum RoundFlowState {
576 NonInteractivePending {
578 unlock_hash: UnlockHash,
579 },
580
581 InteractivePending,
583 InteractiveOngoing {
585 round_seq: RoundSeq,
586 attempt_seq: usize,
587 state: AttemptState,
588 },
589
590 Finished {
592 funding_tx: Transaction,
593 unlock_hash: UnlockHash,
594 },
595
596 Failed {
598 error: String,
599 },
600
601 Canceled,
603}
604
605pub enum AttemptState {
610 AwaitingAttempt,
611 AwaitingUnsignedVtxoTree {
612 cosign_keys: Vec<Keypair>,
613 unlock_hash: UnlockHash,
614 },
615 AwaitingFinishedRound {
616 unsigned_round_tx: Transaction,
617 vtxos_spec: VtxoTreeSpec,
618 unlock_hash: UnlockHash,
619 },
620}
621
622impl AttemptState {
623 fn kind(&self) -> &'static str {
625 match self {
626 Self::AwaitingAttempt => "AwaitingAttempt",
627 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
628 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
629 }
630 }
631}
632
633enum AttemptProgressResult {
635 Finished {
636 funding_tx: Transaction,
637 vtxos: Vec<Vtxo<Full>>,
638 unlock_hash: UnlockHash,
639 },
640 Failed(anyhow::Error),
641 Updated {
647 new_state: AttemptState,
648 },
649 NotUpdated,
650}
651
652async fn start_attempt(
654 wallet: &Wallet,
655 participation: &RoundParticipation,
656 event: &RoundAttempt,
657) -> anyhow::Result<AttemptState> {
658 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
659
660 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
662 .take(participation.outputs.len())
663 .collect::<Vec<_>>();
664
665 let cosign_nonces = cosign_keys.iter()
668 .map(|key| {
669 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
670 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
671 for _ in 0..ark_info.nb_round_nonces {
672 let (s, p) = musig::nonce_pair(key);
673 secs.push(s);
674 pubs.push(p);
675 }
676 (secs, pubs)
677 })
678 .take(participation.outputs.len())
679 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
680
681
682 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
684 participation.inputs.len(), participation.outputs.len(),
685 );
686
687 let unblinded_mailbox_id = wallet.mailbox_identifier();
689 let signed_reqs = participation.outputs.iter()
690 .zip(cosign_keys.iter())
691 .zip(cosign_nonces.iter())
692 .map(|((req, cosign_key), (_sec, pub_nonces))| {
693 SignedVtxoRequest {
694 vtxo: req.clone(),
695 cosign_pubkey: cosign_key.public_key(),
696 nonces: pub_nonces.clone(),
697 }
698 })
699 .collect::<Vec<_>>();
700
701 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
702 for vtxo in participation.inputs.iter() {
703 let keypair = wallet.get_vtxo_key(vtxo).await
704 .map_err(HarkForfeitError::Err)?;
705 input_vtxos.push(protos::InputVtxo {
706 vtxo_id: vtxo.id().to_bytes().to_vec(),
707 attestation: {
708 let attestation = RoundAttemptAttestation::new(
709 event.challenge, vtxo.id(), &signed_reqs, &keypair,
710 );
711 attestation.serialize()
712 },
713 });
714 }
715
716 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
718 .map_err(HarkForfeitError::Err)?;
719
720 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
721 input_vtxos: input_vtxos,
722 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
723 #[allow(deprecated)]
724 offboard_requests: vec![],
725 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
726 }).await.context("Ark server refused our payment submission")?;
727 let unlock_hash = UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?;
728
729 if let Some(k) = cosign_keys.first() {
732 wallet.inner.round_secret_nonces.stash(
733 k.public_key(),
734 cosign_nonces.into_iter().map(|(sec, _pub)| sec).collect(),
735 );
736 }
737
738 Ok(AttemptState::AwaitingUnsignedVtxoTree { unlock_hash, cosign_keys })
739}
740
741#[derive(Debug, thiserror::Error)]
743enum HarkForfeitError {
744 #[error("error after forfeits were sent")]
746 SentForfeits(#[source] anyhow::Error),
747 #[error("error before forfeits were sent")]
749 Err(#[source] anyhow::Error),
750}
751
752async fn hark_cosign_leaf(
753 wallet: &Wallet,
754 srv: &mut ServerConnection,
755 funding_tx: &Transaction,
756 vtxo: &mut Vtxo<Full>,
757) -> anyhow::Result<()> {
758 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
759 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
760 .with_context(|| format!(
761 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
762 ))?.1;
763 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
764 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
765 protos::LeafVtxoCosignRequest::from(cosign_req),
766 ).await
767 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
768 .into_inner().try_into()
769 .context("bad leaf vtxo cosign response")?;
770 ensure!(ctx.finalize(vtxo, cosign_resp),
771 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
772 );
773
774 Ok(())
775}
776
777async fn hark_vtxo_swap(
787 wallet: &Wallet,
788 participation: &RoundParticipation,
789 output_vtxos: &mut [Vtxo<Full>],
790 funding_tx: &Transaction,
791 unlock_hash: UnlockHash,
792) -> Result<(), HarkForfeitError> {
793 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
794
795 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
797 .context("couldn't send our input vtxo transactions to server")
798 .map_err(HarkForfeitError::Err)?;
799
800 for vtxo in output_vtxos.iter_mut() {
802 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
803 .map_err(HarkForfeitError::Err)?;
804 }
805
806 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
809 unlock_hash: unlock_hash.to_byte_array().to_vec(),
810 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
811 }).await
812 .context("request forfeits nonces call failed")
813 .map_err(HarkForfeitError::Err)?
814 .into_inner().public_nonces.into_iter()
815 .map(|b| musig::PublicNonce::from_bytes(b))
816 .collect::<Result<Vec<_>, _>>()
817 .context("invalid forfeit nonces")
818 .map_err(HarkForfeitError::Err)?;
819
820 if server_nonces.len() != participation.inputs.len() {
821 return Err(HarkForfeitError::Err(anyhow!(
822 "server sent {} nonce pairs, expected {}",
823 server_nonces.len(), participation.inputs.len(),
824 )));
825 }
826
827 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
828 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
829 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
830 .ok().flatten().with_context(|| format!(
831 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
832 )).map_err(HarkForfeitError::Err)?.1;
833 forfeit_bundles.push(HashLockedForfeitBundle::new(
834 input, unlock_hash, &user_key, &nonces,
835 ))
836 }
837
838 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
839 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
840 }).await
841 .context("forfeit vtxos call failed")
842 .map_err(HarkForfeitError::SentForfeits)?
843 .into_inner().unlock_preimage.as_slice().try_into()
844 .context("invalid preimage length")
845 .map_err(HarkForfeitError::SentForfeits)?;
846
847 for vtxo in output_vtxos.iter_mut() {
848 if !vtxo.provide_unlock_preimage(preimage) {
849 return Err(HarkForfeitError::SentForfeits(anyhow!(
850 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
851 preimage.as_hex(), vtxo.id(), unlock_hash,
852 )));
853 }
854
855 vtxo.validate(&funding_tx).with_context(|| format!(
857 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
858 )).map_err(HarkForfeitError::SentForfeits)?;
859 }
860
861 Ok(())
862}
863
864fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
865 match vtxo.validate(funding_tx) {
866 Err(VtxoValidationError::GenesisTransition {
867 genesis_idx, genesis_len, transition_kind, ..
868 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
869 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
870 vtxo.serialize_hex(),
871 )),
872 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
873 }
874}
875
876fn check_round_matches_participation(
877 part: &RoundParticipation,
878 new_vtxos: &[Vtxo<Full>],
879 funding_tx: &Transaction,
880) -> anyhow::Result<()> {
881 ensure!(new_vtxos.len() == part.outputs.len(),
882 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
883 );
884
885 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
886 ensure!(vtxo.amount() == req.amount,
887 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
888 );
889 ensure!(*vtxo.policy() == req.policy,
890 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
891 );
892
893 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
895 }
896
897 Ok(())
898}
899
900async fn check_funding_tx_confirmations(
910 wallet: &Wallet,
911 funding_txid: Txid,
912 funding_tx: &Transaction,
913) -> anyhow::Result<bool> {
914 let tip = wallet.inner.chain.tip().await.context("chain source error")?;
915 let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
916 let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
917 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
918 funding_txid, tx_status, tip,
919 );
920 match tx_status {
921 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
922 TxStatus::Mempool | TxStatus::Confirmed(_) => {
923 if wallet.inner.config.round_tx_required_confirmations == 0 {
924 debug!("Accepting round funding tx without confirmations because of configuration");
925 Ok(true)
926 } else {
927 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
928 Ok(false)
929 }
930 },
931 TxStatus::NotFound => {
932 if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
937 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
938 funding_txid, serialize_hex(funding_tx), e,
939 ))
940 } else {
941 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
942 Ok(false)
943 }
944 },
945 }
946}
947
948enum HarkProgressResult {
949 RoundPending,
950 RoundNotFound,
951 FundingTxUnconfirmed {
952 funding_txid: Txid,
953 },
954 Ok {
955 funding_tx: Transaction,
956 new_vtxos: Vec<Vtxo<Full>>,
957 },
958}
959
960async fn progress_delegated(
961 wallet: &Wallet,
962 participation: &RoundParticipation,
963 unlock_hash: UnlockHash,
964) -> Result<HarkProgressResult, HarkForfeitError> {
965 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
966
967 let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
968 unlock_hash: unlock_hash.to_byte_array().to_vec(),
969 }).await {
970 Ok(resp) => resp.into_inner(),
971 Err(err) if err.code() == tonic::Code::NotFound => {
972 return Ok(HarkProgressResult::RoundNotFound);
973 },
974 Err(err) => {
975 return Err(HarkForfeitError::Err(
976 anyhow::Error::from(err).context("error checking round participation status"),
977 ));
978 },
979 };
980 let status = protos::RoundParticipationStatus::try_from(resp.status)
981 .context("unknown status from server")
982 .map_err(HarkForfeitError::Err) ?;
983
984 if status == protos::RoundParticipationStatus::RoundPartPending {
985 trace!("Hark round still pending");
986 return Ok(HarkProgressResult::RoundPending);
987 }
988
989 if status == protos::RoundParticipationStatus::RoundPartReleased {
994 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
995 warn!("Server says preimage was already released for hArk participation \
996 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
997 );
998 }
999
1000 let funding_tx_bytes = resp.round_funding_tx
1001 .context("funding txid should be provided when status is not pending")
1002 .map_err(HarkForfeitError::Err)?;
1003 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
1004 .context("invalid funding txid")
1005 .map_err(HarkForfeitError::Err)?;
1006 let funding_txid = funding_tx.compute_txid();
1007 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
1008 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
1009 );
1010
1011 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
1013 Ok(true) => {},
1014 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
1015 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
1016 }
1017
1018 let mut new_vtxos = resp.output_vtxos.into_iter()
1019 .map(|v| <Vtxo<Full>>::deserialize(&v))
1020 .collect::<Result<Vec<_>, _>>()
1021 .context("invalid output VTXOs from server")
1022 .map_err(HarkForfeitError::Err)?;
1023
1024 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1026 .context("new VTXOs received from server don't match our participation")
1027 .map_err(HarkForfeitError::Err)?;
1028
1029 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1030 .context("error forfeiting hArk VTXOs")
1031 .map_err(HarkForfeitError::SentForfeits)?;
1032
1033 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1034}
1035
1036async fn progress_attempt(
1037 state: &mut AttemptState,
1038 wallet: &Wallet,
1039 part: &RoundParticipation,
1040 event: &RoundEvent,
1041) -> AttemptProgressResult {
1042 match (state, event) {
1046
1047 (
1048 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, unlock_hash },
1049 RoundEvent::VtxoProposal(e),
1050 ) => {
1051 trace!("Received VtxoProposal: {:#?}", e);
1052
1053 let secret_nonces = if let Some(first) = cosign_keys.first() {
1056 match wallet.inner.round_secret_nonces.take(&first.public_key()) {
1057 Some(n) => n,
1058 None => return AttemptProgressResult::Failed(anyhow!(
1059 "secret cosign nonces unavailable (likely after a restart); \
1060 abandoning round attempt to avoid nonce reuse",
1061 )),
1062 }
1063 } else {
1064 vec![]
1065 };
1066
1067 match sign_vtxo_tree(
1068 wallet,
1069 part,
1070 &cosign_keys,
1071 secret_nonces,
1072 &e.unsigned_round_tx,
1073 &e.vtxos_spec,
1074 &e.cosign_agg_nonces,
1075 ).await {
1076 Ok(()) => {
1077 AttemptProgressResult::Updated {
1078 new_state: AttemptState::AwaitingFinishedRound {
1079 unsigned_round_tx: e.unsigned_round_tx.clone(),
1080 vtxos_spec: e.vtxos_spec.clone(),
1081 unlock_hash: *unlock_hash,
1082 },
1083 }
1084 },
1085 Err(e) => {
1086 trace!("Error signing VTXO tree: {:#}", e);
1087 AttemptProgressResult::Failed(e)
1088 },
1089 }
1090 },
1091
1092 (
1093 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1094 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1095 ) => {
1096 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1097 return AttemptProgressResult::Failed(anyhow!(
1098 "signed funding tx ({}) doesn't match tx received before ({})",
1099 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1100 ));
1101 }
1102
1103 if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1104 warn!("Failed to broadcast signed round tx: {:#}", e);
1105 }
1106
1107 match construct_new_vtxos(
1108 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1109 ).await {
1110 Ok(v) => AttemptProgressResult::Finished {
1111 funding_tx: signed_round_tx.clone(),
1112 vtxos: v,
1113 unlock_hash: *unlock_hash,
1114 },
1115 Err(e) => AttemptProgressResult::Failed(anyhow!(
1116 "failed to construct new VTXOs for round: {:#}", e,
1117 )),
1118 }
1119 },
1120
1121 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1122 AttemptProgressResult::Failed(anyhow!(
1123 "unexpectedly received a finished round while we were in state {}",
1124 state.kind(),
1125 ))
1126 },
1127
1128 (state, _) => {
1129 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1130 AttemptProgressResult::NotUpdated
1131 },
1132 }
1133}
1134
1135async fn sign_vtxo_tree(
1136 wallet: &Wallet,
1137 participation: &RoundParticipation,
1138 cosign_keys: &[Keypair],
1139 secret_nonces: Vec<Vec<SecretNonce>>,
1140 unsigned_round_tx: &Transaction,
1141 vtxo_tree: &VtxoTreeSpec,
1142 cosign_agg_nonces: &[musig::AggregatedNonce],
1143) -> anyhow::Result<()> {
1144 let (mut srv, _) = wallet.require_server().await.context("server not available")?;
1145
1146 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1147
1148 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1150 for vtxo_req in vtxo_tree.iter_vtxos() {
1151 if let Some(i) = my_vtxos.iter().position(|v| {
1152 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1153 }) {
1154 my_vtxos.swap_remove(i);
1155 }
1156 }
1157 if !my_vtxos.is_empty() {
1158 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1159 }
1160
1161 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1162 trace!("Sending vtxo signatures to server...");
1163 for ((req, key), sec) in participation.outputs.iter().zip(cosign_keys).zip(secret_nonces) {
1168 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1169 let part_sigs = unsigned_vtxos.cosign_branch(
1170 &cosign_agg_nonces, leaf_idx, key, sec,
1171 ).context("failed to cosign branch: our request not part of tree")?;
1172
1173 info!("Sending {} partial vtxo cosign signatures for pk {}",
1174 part_sigs.len(), key.public_key(),
1175 );
1176
1177 srv.client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1178 pubkey: key.public_key().serialize().to_vec(),
1179 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1180 }).await.context("error sending vtxo signatures")?;
1181 }
1182 trace!("Done sending vtxo signatures to server");
1183
1184 Ok(())
1185}
1186
1187async fn construct_new_vtxos(
1188 participation: &RoundParticipation,
1189 unsigned_round_tx: &Transaction,
1190 vtxo_tree: &VtxoTreeSpec,
1191 vtxo_cosign_sigs: &[schnorr::Signature],
1192) -> anyhow::Result<Vec<Vtxo<Full>>> {
1193 let round_txid = unsigned_round_tx.compute_txid();
1194 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1195 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1196
1197 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1199 bail!("Received incorrect vtxo cosign signatures from server");
1201 }
1202
1203 let signed_vtxos = vtxo_tree
1204 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1205 .into_cached_tree();
1206
1207 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1208 let total_nb_expected_vtxos = expected_vtxos.len();
1209
1210 let mut new_vtxos = vec![];
1211 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1212 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1213 let vtxo = signed_vtxos.build_vtxo(idx);
1214
1215 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1218 .context("constructed invalid vtxo from tree")?;
1219
1220 info!("New VTXO from round: {} ({}, {})",
1221 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1222 );
1223
1224 new_vtxos.push(vtxo);
1225 expected_vtxos.swap_remove(expected_idx);
1226 }
1227 }
1228
1229 if !expected_vtxos.is_empty() {
1230 if expected_vtxos.len() == total_nb_expected_vtxos {
1231 bail!("None of our VTXOs were present in round!");
1233 } else {
1234 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1235 expected_vtxos.len(), expected_vtxos,
1236 );
1237 }
1238 }
1239 Ok(new_vtxos)
1240}
1241
1242async fn persist_round_success(
1244 wallet: &Wallet,
1245 participation: &RoundParticipation,
1246 movement_id: Option<MovementId>,
1247 new_vtxos: &[Vtxo<Full>],
1248 funding_tx: &Transaction,
1249) -> anyhow::Result<()> {
1250 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1251 new_vtxos.len(), movement_id,
1252 );
1253
1254 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1258 .context("failed to store new VTXOs");
1259 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1260 .context("failed to mark input VTXOs as spent");
1261 let update_result = if let Some(mid) = movement_id {
1262 wallet.inner.movements.finish_movement_with_update(
1263 mid,
1264 MovementStatus::Successful,
1265 MovementUpdate::new()
1266 .produced_vtxos(new_vtxos)
1267 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1268 ).await.context("failed to mark movement as finished")
1269 } else {
1270 Ok(())
1271 };
1272
1273 store_result?;
1274 spent_result?;
1275 update_result?;
1276
1277 Ok(())
1278}
1279
1280async fn persist_round_failure(
1281 wallet: &Wallet,
1282 participation: &RoundParticipation,
1283 movement_id: Option<MovementId>,
1284) -> anyhow::Result<()> {
1285 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1286 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1287 let finish_result = if let Some(movement_id) = movement_id {
1288 wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1289 } else {
1290 Ok(())
1291 };
1292 if let Err(e) = &finish_result {
1293 error!("Failed to mark movement as failed: {:#}", e);
1294 }
1295 match (unlock_result, finish_result) {
1296 (Ok(()), Ok(())) => Ok(()),
1297 (Err(e), _) => Err(e),
1298 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1299 }
1300}
1301
1302async fn update_funding_txid(
1303 wallet: &Wallet,
1304 movement_id: MovementId,
1305 funding_txid: Txid,
1306) -> anyhow::Result<()> {
1307 wallet.inner.movements.update_movement(
1308 movement_id,
1309 MovementUpdate::new()
1310 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1311 ).await.context("Unable to update funding txid of round")
1312}
1313
1314#[derive(Default)]
1323pub struct RoundSecretNonces {
1324 inner: parking_lot::Mutex<HashMap<bitcoin::secp256k1::PublicKey, Vec<Vec<SecretNonce>>>>,
1325}
1326
1327impl RoundSecretNonces {
1328 pub fn new() -> Self {
1329 Self { inner: parking_lot::Mutex::new(HashMap::new()) }
1330 }
1331
1332 pub fn stash(
1334 &self,
1335 first_cosign_pubkey: bitcoin::secp256k1::PublicKey,
1336 nonces: Vec<Vec<SecretNonce>>,
1337 ) {
1338 self.inner.lock().insert(first_cosign_pubkey, nonces);
1339 }
1340
1341 pub fn take(
1344 &self,
1345 first_cosign_pubkey: &bitcoin::secp256k1::PublicKey,
1346 ) -> Option<Vec<Vec<SecretNonce>>> {
1347 self.inner.lock().remove(first_cosign_pubkey)
1348 }
1349
1350 pub fn forget(&self, first_cosign_pubkey: &bitcoin::secp256k1::PublicKey) {
1354 self.inner.lock().remove(first_cosign_pubkey);
1355 }
1356}
1357
1358impl Wallet {
1359 pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1364 let guard = self.inner.lock_manager.lock(
1365 &format!("{}.round.{}", self.fingerprint(), id),
1366 ROUND_LOCK_TIMEOUT,
1367 ).await.with_context(|| format!(
1368 "timed out waiting for lock on round state {} (wallet {})",
1369 id, self.fingerprint(),
1370 ))?;
1371
1372 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1373 return Ok(Some(state.lock(guard)));
1374 }
1375
1376 Ok(None)
1377 }
1378
1379 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1381 let (mut srv, _) = self.require_server().await?;
1382 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1383 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1384 }
1385
1386 pub async fn join_next_round(
1395 &self,
1396 participation: RoundParticipation,
1397 movement_kind: Option<RoundMovement>,
1398 ) -> anyhow::Result<StoredRoundState> {
1399 let movement = if let Some(kind) = movement_kind {
1400 Some(self.inner.movements.new_guarded_movement_with_update(
1401 Subsystem::ROUND,
1402 kind.to_string(),
1403 OnDropStatus::Failed,
1404 participation.to_movement_update()?
1405 ).await?)
1406 } else {
1407 None
1408 };
1409 let movement_id = movement.as_ref().map(|m| m.id());
1410 let input_vtxos = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1411 let state = RoundState::new_interactive(participation, movement_id);
1412
1413 self.lock_vtxos(&input_vtxos, movement_id.map(|m| m.into())).await
1414 .context("failed to lock input VTXOs")?;
1415
1416 match (async || {
1417 let id = self.inner.db.store_round_state(&state).await?;
1418 Ok(self.lock_wait_round_state(id).await?
1419 .context("failed to lock fresh round state")?)
1420 })().await {
1421 Ok(state) => {
1422 if let Some(mut m) = movement {
1423 m.stop();
1424 }
1425 Ok(state)
1426 },
1427 Err(e) => {
1428 self.unlock_vtxos(&input_vtxos).await
1429 .context("failed to unlock input VTXOs")?;
1430 if let Some(mut m) = movement {
1431 m.fail().await.context("failed to mark movement as failed")?;
1432 }
1433 Err(e)
1434 },
1435 }
1436 }
1437
1438 pub async fn join_next_round_delegated(
1440 &self,
1441 participation: RoundParticipation,
1442 movement_kind: Option<RoundMovement>,
1443 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1444 let movement = if let Some(kind) = movement_kind {
1445 Some(self.inner.movements.new_guarded_movement_with_update(
1446 Subsystem::ROUND,
1447 kind.to_string(),
1448 OnDropStatus::Failed,
1449 participation.to_movement_update()?,
1450 ).await?)
1451 } else {
1452 None
1453 };
1454 let movement_id = movement.as_ref().map(|m| m.id());
1455
1456 let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1457 self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1458 .context("error locking input VTXOs")?;
1459
1460 match self.join_next_round_delegated_inner(participation, movement_id).await {
1461 Ok(state) => {
1462 if let Some(mut m) = movement {
1463 m.stop();
1464 }
1465 Ok(state)
1466 },
1467 Err(e) => {
1468 self.unlock_vtxos(&input_ids).await
1469 .context("error unlocking input VTXOs")?;
1470 if let Some(mut m) = movement {
1471 m.fail().await.context("error marking movement as failed")?;
1472 }
1473 Err(e)
1474 },
1475 }
1476 }
1477
1478 async fn join_next_round_delegated_inner(
1479 &self,
1480 participation: RoundParticipation,
1481 movement_id: Option<MovementId>,
1482 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1483 let (mut srv, _) = self.require_server().await?;
1484
1485 let unblinded_mailbox_id = self.mailbox_identifier();
1487
1488 self.register_vtxo_transactions_with_server(&participation.inputs).await
1490 .context("failed to register input vtxo transactions with server")?;
1491
1492 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1494 for vtxo in participation.inputs.iter() {
1495 let keypair = self.get_vtxo_key(vtxo).await
1496 .context("failed to get vtxo keypair")?;
1497 input_vtxos.push(protos::InputVtxo {
1498 vtxo_id: vtxo.id().to_bytes().to_vec(),
1499 attestation: {
1500 let attestation = DelegatedRoundParticipationAttestation::new(
1501 vtxo.id(), &participation.outputs, &keypair,
1502 );
1503 attestation.serialize()
1504 },
1505 });
1506 }
1507
1508 let vtxo_requests = participation.outputs.iter()
1510 .map(|req|
1511 protos::VtxoRequest {
1512 policy: req.policy.serialize(),
1513 amount: req.amount.to_sat(),
1514 })
1515 .collect::<Vec<_>>();
1516
1517 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1519 input_vtxos,
1520 vtxo_requests,
1521 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1522 }).await.context("error submitting round participation to server")?.into_inner();
1523
1524 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1525 .context("invalid unlock hash from server")?;
1526
1527 let state = RoundState::new_delegated(participation, unlock_hash, movement_id);
1528
1529 info!("Delegated round participation submitted, it will automatically execute \
1530 when you next sync your wallet after the round happened \
1531 (and has sufficient confirmations).",
1532 );
1533
1534 let id = self.inner.db.store_round_state(&state).await?;
1535 Ok(StoredRoundState::new(id, state))
1536 }
1537
1538 pub(crate) async fn join_attempt_interactive(
1546 &self,
1547 participation: RoundParticipation,
1548 attempt: &RoundAttempt,
1549 movement_kind: Option<RoundMovement>,
1550 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1551 let movement = if let Some(kind) = movement_kind {
1552 Some(self.inner.movements.new_guarded_movement_with_update(
1553 Subsystem::ROUND,
1554 kind.to_string(),
1555 OnDropStatus::Failed,
1556 participation.to_movement_update()?,
1557 ).await?)
1558 } else {
1559 None
1560 };
1561 let movement_id = movement.as_ref().map(|m| m.id());
1562
1563 let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1564 self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1565 .context("error locking input VTXOs")?;
1566
1567 match self.join_attempt_interactive_inner(participation, attempt, movement_id).await {
1568 Ok(state) => {
1569 if let Some(mut m) = movement {
1570 m.stop();
1571 }
1572 Ok(state)
1573 },
1574 Err(e) => {
1575 self.unlock_vtxos(&input_ids).await
1576 .context("error unlocking input VTXOs")?;
1577 if let Some(mut m) = movement {
1578 m.fail().await.context("error marking movement as failed")?;
1579 }
1580 Err(e)
1581 },
1582 }
1583 }
1584
1585 async fn join_attempt_interactive_inner(
1586 &self,
1587 participation: RoundParticipation,
1588 attempt: &RoundAttempt,
1589 movement_id: Option<MovementId>,
1590 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1591 let attempt_state = start_attempt(self, &participation, attempt).await?;
1595
1596 let mut state = RoundState::new_interactive(participation, movement_id);
1597 state.flow = RoundFlowState::InteractiveOngoing {
1598 round_seq: attempt.round_seq,
1599 attempt_seq: attempt.attempt_seq,
1600 state: attempt_state,
1601 };
1602
1603 let id = self.inner.db.store_round_state(&state).await?;
1604 Ok(StoredRoundState::new(id, state))
1605 }
1606
1607 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1609 self.inner.db.get_pending_round_state_ids().await
1610 }
1611
1612 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1614 let ids = self.inner.db.get_pending_round_state_ids().await?;
1615 let mut states = Vec::with_capacity(ids.len());
1616 for id in ids {
1617 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1618 states.push(state);
1619 }
1620 }
1621 Ok(states)
1622 }
1623
1624 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1626 let mut ret = Amount::ZERO;
1627 for round in self.pending_round_states().await? {
1628 ret += round.state().pending_balance();
1629 }
1630 Ok(ret)
1631 }
1632
1633 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1638 let mut ret = Vec::new();
1639 for round in self.pending_round_states().await? {
1640 let inputs = round.state().locked_pending_inputs();
1641 ret.reserve(inputs.len());
1642 for input in inputs {
1643 let v = self.get_vtxo_by_id(input.id()).await
1644 .context("unknown round input VTXO")?;
1645 ret.push(v);
1646 }
1647 }
1648 Ok(ret)
1649 }
1650
1651 pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1653 let states = self.pending_round_states().await?;
1654 if states.is_empty() {
1655 return Ok(HashMap::new());
1656 }
1657
1658 debug!("Syncing {} pending round states...", states.len());
1659
1660 let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1661 tokio_stream::iter(states).for_each_concurrent(10, |state| {
1662 let ret = ret.clone();
1663 async move {
1664 if state.state().ongoing_participation() {
1666 return;
1667 }
1668
1669 let mut state = match self.lock_wait_round_state(state.id()).await {
1670 Ok(Some(state)) => state,
1671 Ok(None) => return,
1672 Err(e) => {
1673 warn!("Error locking round state: {:#}", e);
1674 return;
1675 },
1676 };
1677
1678 let status = match state.state_mut().sync(self).await {
1679 Ok(s) => s,
1680 Err(e) => {
1681 warn!("Error syncing round: {:#}", e);
1682 return;
1683 },
1684 };
1685 trace!("Synced round #{}, status: {:?}", state.id(), status);
1686 match status {
1687 RoundStatus::Confirmed { funding_txid } => {
1688 info!("Round confirmed. Funding tx {}", funding_txid);
1689 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1690 warn!("Error removing confirmed round state from db: {:#}", e);
1691 }
1692 },
1693 RoundStatus::Unconfirmed { funding_txid } => {
1694 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1695 if let Err(e) = self.inner.db.update_round_state(&state).await {
1696 warn!("Error updating pending round state in db: {:#}", e);
1697 }
1698 },
1699 RoundStatus::Pending => {
1700 if let Err(e) = self.inner.db.update_round_state(&state).await {
1701 warn!("Error updating pending round state in db: {:#}", e);
1702 }
1703 },
1704 RoundStatus::Failed { ref error } => {
1705 error!("Round failed: {}", error);
1706 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1707 warn!("Error removing failed round state from db: {:#}", e);
1708 }
1709 },
1710 RoundStatus::Canceled => {
1711 error!("Round canceled");
1712 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1713 warn!("Error removing canceled round state from db: {:#}", e);
1714 }
1715 },
1716 }
1717 ret.lock().insert(state.id(), status);
1718 }
1719 }).await;
1720
1721 Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1722 }
1723
1724 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1726 let (mut srv, _) = self.require_server().await?;
1727 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1728 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1729 }
1730
1731 async fn inner_process_event(
1732 &self,
1733 state: &mut StoredRoundState,
1734 event: Option<&RoundEvent>,
1735 ) {
1736 if let Some(event) = event && state.state().ongoing_participation() {
1737 let updated = state.state_mut().process_event(self, &event).await;
1738 if updated {
1739 if let Err(e) = self.inner.db.update_round_state(&state).await {
1740 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1741 }
1742 }
1743 }
1744
1745 match state.state_mut().sync(self).await {
1746 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1747 Ok(s) if s.is_final() => {
1748 info!("Round #{} finished with result: {:?}", state.id(), s);
1749 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1750 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1751 }
1752 },
1753 Ok(s) => {
1754 trace!("Round state #{} is now in state {:?}", state.id(), s);
1755 if let Err(e) = self.inner.db.update_round_state(&state).await {
1756 warn!("Error storing round state #{}: {:#}", state.id(), e);
1757 }
1758 },
1759 }
1760 }
1761
1762 pub async fn progress_pending_rounds(
1767 &self,
1768 last_round_event: Option<&RoundEvent>,
1769 ) -> anyhow::Result<()> {
1770 let states = self.pending_round_states().await?;
1771 if states.is_empty() {
1772 return Ok(());
1773 }
1774
1775 info!("Processing {} rounds...", states.len());
1776
1777 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1778
1779 let has_ongoing_participation = states.iter()
1780 .any(|s| s.state().ongoing_participation());
1781 if has_ongoing_participation && last_round_event.is_none() {
1782 match self.get_last_round_event().await {
1783 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1784 Err(e) => {
1785 warn!("Error fetching round event, \
1786 failed to progress ongoing rounds: {:#}", e);
1787 },
1788 }
1789 }
1790
1791 let event = last_round_event.as_ref().map(|c| c.as_ref());
1792
1793 let futs = states.into_iter().map(async |state| {
1794 let locked = self.lock_wait_round_state(state.id()).await?;
1795 if let Some(mut locked) = locked {
1796 self.inner_process_event(&mut locked, event).await;
1797 }
1798 Ok::<_, anyhow::Error>(())
1799 });
1800
1801 futures::future::join_all(futs).await;
1802
1803 Ok(())
1804 }
1805
1806 pub async fn subscribe_round_events(&self)
1807 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1808 {
1809 let (mut srv, _) = self.require_server().await?;
1810 let mut req = tonic::IntoRequest::into_request(protos::Empty {});
1811 req.set_timeout(SUBSCRIBE_REQUEST_TIMEOUT);
1812 let events = srv.client.subscribe_rounds(req).await?
1813 .into_inner().map(|m| {
1814 let m = m.context("received error on event stream")?;
1815 let e = RoundEvent::try_from(m.clone())
1816 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1817 trace!("Received round event: {}", e);
1818 Ok::<_, anyhow::Error>(e)
1819 });
1820 Ok(events)
1821 }
1822
1823 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1828 let mut events = self.subscribe_round_events().await?;
1829
1830 loop {
1831 let state_ids = self.pending_round_states().await?.iter()
1834 .filter(|s| s.state().ongoing_participation())
1835 .map(|s| s.id())
1836 .collect::<Vec<_>>();
1837
1838 if state_ids.is_empty() {
1839 info!("All rounds handled");
1840 return Ok(());
1841 }
1842
1843 let event = events.next().await
1844 .context("events stream broke")?
1845 .context("error on event stream")?;
1846
1847 let futs = state_ids.into_iter().map(async |state| {
1848 let locked = self.lock_wait_round_state(state).await?;
1849 if let Some(mut locked) = locked {
1850 self.inner_process_event(&mut locked, Some(&event)).await;
1851 }
1852 Ok::<_, anyhow::Error>(())
1853 });
1854
1855 futures::future::join_all(futs).await;
1856 }
1857 }
1858
1859 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1864 let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1866
1867 let futures = state_ids.into_iter().map(|state_id| {
1868 async move {
1869 let mut state = match self.lock_wait_round_state(state_id).await {
1871 Ok(Some(s)) => s,
1872 Ok(None) => return,
1873 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1874 };
1875
1876 match state.state_mut().try_cancel(self).await {
1877 Ok(true) => {
1878 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1879 warn!("Error removing canceled round state from db: {:#}", e);
1880 }
1881 },
1882 Ok(false) => {},
1883 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1884 }
1885 }
1886 });
1887
1888 join_all(futures).await;
1889
1890 Ok(())
1891 }
1892
1893 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1895 let mut state = self.lock_wait_round_state(id).await?
1896 .context("round state not found")?;
1897
1898 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1899 self.inner.db.remove_round_state(&state).await
1900 .context("error removing canceled round state from db")?;
1901 } else {
1902 bail!("failed to cancel round");
1903 }
1904
1905 Ok(())
1906 }
1907
1908 pub(crate) async fn participate_round(
1915 &self,
1916 participation: RoundParticipation,
1917 movement_kind: Option<RoundMovement>,
1918 ) -> anyhow::Result<RoundStatus> {
1919 let state = self.join_next_round(participation, movement_kind).await?;
1920
1921 info!("Waiting for a round start...");
1922 let mut events = self.subscribe_round_events().await?;
1923
1924 self.drive_round_state(state, &mut events).await
1925 }
1926
1927 pub(crate) async fn drive_round_state<S>(
1935 &self,
1936 mut state: StoredRoundState,
1937 events: &mut S,
1938 ) -> anyhow::Result<RoundStatus>
1939 where
1940 S: Stream<Item = anyhow::Result<RoundEvent>> + Unpin,
1941 {
1942 loop {
1943 if !state.state().ongoing_participation() {
1944 let status = state.state_mut().sync(self).await?;
1945 match status {
1946 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1947 RoundStatus::Canceled => bail!("round canceled"),
1948 status => return Ok(status),
1949 }
1950 }
1951
1952 let event = events.next().await
1953 .context("events stream broke")?
1954 .context("error on event stream")?;
1955 if state.state_mut().process_event(self, &event).await {
1956 self.inner.db.update_round_state(&state).await?;
1957 }
1958 }
1959 }
1960}
1961
1962#[cfg(test)]
1963mod test {
1964 use super::*;
1965
1966 use bitcoin::secp256k1::Secp256k1;
1967
1968 fn pubkey() -> bitcoin::secp256k1::PublicKey {
1969 let secp = Secp256k1::new();
1970 Keypair::new(&secp, &mut rand::thread_rng()).public_key()
1971 }
1972
1973 fn nonces() -> Vec<Vec<SecretNonce>> {
1974 let secp = Secp256k1::new();
1975 let key = Keypair::new(&secp, &mut rand::thread_rng());
1976 vec![vec![musig::nonce_pair(&key).0, musig::nonce_pair(&key).0]]
1980 }
1981
1982 #[test]
1983 fn stash_and_take() {
1984 let store = RoundSecretNonces::new();
1985 let k = pubkey();
1986 store.stash(k, nonces());
1987
1988 assert!(store.take(&k).is_some());
1989 }
1990
1991 #[test]
1992 fn cannot_take_twice() {
1993 let store = RoundSecretNonces::new();
1994 let k = pubkey();
1995 store.stash(k, nonces());
1996
1997 assert!(store.take(&k).is_some());
1998 assert!(store.take(&k).is_none());
1999 }
2000
2001 #[test]
2002 fn cannot_take_after_forget() {
2003 let store = RoundSecretNonces::new();
2004 let k = pubkey();
2005 store.stash(k, nonces());
2006 store.forget(&k);
2007
2008 assert!(store.take(&k).is_none());
2009 }
2010
2011 #[test]
2012 fn stash_overrides_stash() {
2013 let secp = Secp256k1::new();
2014 let key = Keypair::new(&secp, &mut rand::thread_rng());
2015 let nonces_1 = vec![vec![musig::nonce_pair(&key).0]];
2016 let nonces_2 = vec![];
2017
2018 let store = RoundSecretNonces::new();
2019 store.stash(key.public_key(), nonces_1);
2020 store.stash(key.public_key(), nonces_2);
2021
2022 let taken = store.take(&key.public_key()).expect("nonces present");
2023 assert_eq!(taken.len(), 0);
2024 }
2025}