Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::join_all;
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::movement::manager::OnDropStatus;
37use crate::{Wallet, WalletVtxo, SECP, SUBSCRIBE_REQUEST_TIMEOUT};
38use crate::movement::{MovementId, MovementStatus};
39use crate::movement::update::MovementUpdate;
40use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
41
42/// How long [`Wallet::lock_wait_round_state`] waits for a contended
43/// round lock before giving up. Long enough to outlast a normal round.
44const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
45use crate::subsystem::{RoundMovement, Subsystem};
46
47
48/// The type string for the hArk leaf transition
49const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
50
51/// Struct to communicate your specific participation for an Ark round.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RoundParticipation {
54	#[serde(with = "ark::encode::serde::vec")]
55	pub inputs: Vec<Vtxo<Full>>,
56	/// The output VTXOs that we request in the round,
57	/// including change
58	pub outputs: Vec<VtxoRequest>,
59	/// Optional mailbox identifier for round completion notification
60	#[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
61	pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
62}
63
64impl RoundParticipation {
65	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
66		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
67		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
68		let fee = input_amount - output_amount;
69		Ok(MovementUpdate::new()
70			.consumed_vtxos(&self.inputs)
71			.intended_balance(SignedAmount::ZERO)
72			.effective_balance( - fee.to_signed()?)
73			.fee(fee)
74		)
75	}
76}
77
78#[derive(Debug, Clone)]
79pub enum RoundStatus {
80	/// The round was successful and is fully confirmed
81	Confirmed {
82		funding_txid: Txid,
83	},
84	/// Round successful but not fully confirmed
85	Unconfirmed {
86		funding_txid: Txid,
87	},
88	/// Round didn't finish yet
89	Pending,
90	/// The round failed
91	Failed {
92		error: String,
93	},
94	/// User canceled the round
95	Canceled,
96}
97
98impl RoundStatus {
99	/// Whether this is the final state and it won't change anymore
100	pub fn is_final(&self) -> bool {
101		match self {
102			Self::Confirmed { .. } => true,
103			Self::Unconfirmed { .. } => false,
104			Self::Pending => false,
105			Self::Failed { .. } => true,
106			Self::Canceled => true,
107		}
108	}
109
110	/// Whether it looks like the round succeeded
111	pub fn is_success(&self) -> bool {
112		match self {
113			Self::Confirmed { .. } => true,
114			Self::Unconfirmed { .. } => true,
115			Self::Pending => false,
116			Self::Failed { .. } => false,
117			Self::Canceled => false,
118		}
119	}
120}
121
122/// State of the progress of a round participation
123///
124/// An instance of this struct is kept all the way from the intention of joining
125/// the next round, until either the round fully confirms or it fails and we are
126/// sure it won't have any effect on our wallet.
127///
128/// As soon as we have signed forfeit txs for the round, we keep track of this
129/// round attempt until we see another attempt we participated in confirm or
130/// we gain confidence that the failed attempt will never confirm.
131//
132//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
133// to have better control. this way we can touch db before we sent forfeit sigs
134pub struct RoundState {
135	/// Round is fully done
136	pub(crate) done: bool,
137
138	/// Our participation in this round
139	pub(crate) participation: RoundParticipation,
140
141	/// The flow of the round in case it is still ongoing with the server
142	pub(crate) flow: RoundFlowState,
143
144	/// The new output vtxos of this round participation
145	///
146	/// After we finish the interactive part, we fill this with the uncompleted
147	/// VTXOs which we then try to complete with the unlock preimage.
148	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
149
150	/// Whether we sent our forfeit signatures to the server
151	///
152	/// If we did this and the server refused to reveal our new VTXOs,
153	/// we will be forced to exit.
154	//TODO(stevenroose) implement exit when this is true and we can't make progress
155	// probably based on the input vtxos becoming close to expiry
156	pub(crate) sent_forfeit_sigs: bool,
157
158	/// The ID of the [Movement] associated with this round
159	pub(crate) movement_id: Option<MovementId>,
160}
161
162impl RoundState {
163	fn new_interactive(
164		participation: RoundParticipation,
165		movement_id: Option<MovementId>,
166	) -> Self {
167		Self {
168			participation,
169			movement_id,
170			flow: RoundFlowState::InteractivePending,
171			new_vtxos: Vec::new(),
172			sent_forfeit_sigs: false,
173			done: false,
174		}
175	}
176
177	fn new_delegated(
178		participation: RoundParticipation,
179		unlock_hash: UnlockHash,
180		movement_id: Option<MovementId>,
181	) -> Self {
182		Self {
183			participation,
184			movement_id,
185			flow: RoundFlowState::NonInteractivePending { unlock_hash },
186			new_vtxos: Vec::new(),
187			sent_forfeit_sigs: false,
188			done: false,
189		}
190	}
191
192	/// Our participation in this round
193	pub fn participation(&self) -> &RoundParticipation {
194		&self.participation
195	}
196
197	/// the unlock hash if already known
198	pub fn unlock_hash(&self) -> Option<UnlockHash> {
199		match self.flow {
200			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201			RoundFlowState::InteractivePending => None,
202			RoundFlowState::InteractiveOngoing { .. } => None,
203			RoundFlowState::Failed { .. } => None,
204			RoundFlowState::Canceled => None,
205			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206		}
207	}
208
209	pub fn funding_tx(&self) -> Option<&Transaction> {
210		match self.flow {
211			RoundFlowState::NonInteractivePending { .. } => None,
212			RoundFlowState::InteractivePending => None,
213			RoundFlowState::InteractiveOngoing { .. } => None,
214			RoundFlowState::Failed { .. } => None,
215			RoundFlowState::Canceled => None,
216			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217		}
218	}
219
220	/// Whether the interactive part of the round is still ongoing
221	pub fn ongoing_participation(&self) -> bool {
222		match self.flow {
223			RoundFlowState::NonInteractivePending { .. } => false,
224			RoundFlowState::InteractivePending => true,
225			RoundFlowState::InteractiveOngoing { .. } => true,
226			RoundFlowState::Failed { .. } => false,
227			RoundFlowState::Canceled => false,
228			RoundFlowState::Finished { .. } => false,
229		}
230	}
231
232	/// Tries to cancel the round and returns whether it was succesfully canceled
233	/// or if it was already canceled or failed
234	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235		let ret = match self.flow {
236			RoundFlowState::NonInteractivePending { .. } => {
237				//TODO(stevenroose) we have to cancel with server
238				bail!("it is currently not yet possible to cancel pending delegated rounds");
239			},
240			RoundFlowState::Canceled => true,
241			RoundFlowState::Failed { .. } => true,
242			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
243				self.flow = RoundFlowState::Canceled;
244				true
245			},
246			RoundFlowState::Finished { .. } => false,
247		};
248		if ret {
249			persist_round_failure(wallet, &self.participation, self.movement_id).await
250				.context("failed to persist round failure for cancelation")?;
251		}
252		Ok(ret)
253	}
254
255	async fn try_start_attempt(
256		&mut self,
257		wallet: &Wallet,
258		attempt: &RoundAttempt,
259	) {
260		// Drop the previous attempt's stashed nonces: the new attempt
261		// regenerates cosign keys, so the old key becomes unreachable.
262		if let RoundFlowState::InteractiveOngoing {
263			state: AttemptState::AwaitingUnsignedVtxoTree { ref cosign_keys, .. },
264			..
265		} = self.flow {
266			if let Some(k) = cosign_keys.first() {
267				wallet.inner.round_secret_nonces.forget(&k.public_key());
268			}
269		}
270
271		match start_attempt(wallet, &self.participation, attempt).await {
272			Ok(state) => {
273				self.flow = RoundFlowState::InteractiveOngoing {
274					round_seq: attempt.round_seq,
275					attempt_seq: attempt.attempt_seq,
276					state: state,
277				};
278			},
279			Err(e) => {
280				self.flow = RoundFlowState::Failed {
281					error: format!("{:#}", e),
282				};
283			},
284		}
285	}
286
287	/// Processes the given event and returns true if some update was made to the state
288	pub async fn process_event(
289		&mut self,
290		wallet: &Wallet,
291		event: &RoundEvent,
292	) -> bool {
293		let _: Infallible = match self.flow {
294			RoundFlowState::InteractivePending => {
295				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
296					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
297					self.try_start_attempt(wallet, e).await;
298					return true;
299				} else {
300					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
301						event.kind(), event.round_seq(), event.attempt_seq(),
302					);
303					return false;
304				}
305			},
306			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
307				// here we catch the cases where we're in a wrong flow
308
309				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
310					warn!("Round {} failed by server", round_seq);
311					self.flow = RoundFlowState::Failed {
312						error: format!("round {} failed by server", round_seq),
313					};
314					return true;
315				}
316
317				if event.round_seq() > round_seq {
318					// new round started, we don't support multiple parallel rounds,
319					// this means we failed
320					self.flow = RoundFlowState::Failed {
321						error: format!("round {} started while we were on {}",
322							event.round_seq(), round_seq,
323						),
324					};
325					return true;
326				}
327
328				if event.attempt_seq() < attempt_seq {
329					trace!("ignoring replayed message from old attempt");
330					return false;
331				}
332
333				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
334					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
335					self.try_start_attempt(wallet, e).await;
336					return true;
337				}
338				trace!("Processing event {} for round attempt {}:{} in state {}",
339					event.kind(), round_seq, attempt_seq, state.kind(),
340				);
341
342				return match progress_attempt(state, wallet, &self.participation, event).await {
343					AttemptProgressResult::NotUpdated => false,
344					AttemptProgressResult::Updated { new_state } => {
345						*state = new_state;
346						true
347					},
348					AttemptProgressResult::Failed(e) => {
349						warn!("Round failed with error: {:#}", e);
350						self.flow = RoundFlowState::Failed {
351							error: format!("{:#}", e),
352						};
353						true
354					},
355					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
356						self.new_vtxos = vtxos;
357						let funding_txid = funding_tx.compute_txid();
358						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
359						if let Some(mid) = self.movement_id {
360							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
361								warn!("Error updating the round funding txid: {:#}", e);
362							}
363						}
364						true
365					},
366				};
367			},
368			RoundFlowState::NonInteractivePending { .. }
369				| RoundFlowState::Finished { .. }
370				| RoundFlowState::Failed { .. }
371				| RoundFlowState::Canceled => return false,
372		};
373	}
374
375	/// Sync the round's status and return it
376	///
377	/// When success or failure is returned, the round state can be eliminated
378	//TODO(stevenroose) make RoundState manage its own db record
379	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
380		match self.flow {
381			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
382				Ok(RoundStatus::Confirmed {
383					funding_txid: funding_tx.compute_txid(),
384				})
385			},
386
387			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
388				Ok(RoundStatus::Pending)
389			},
390			RoundFlowState::Failed { ref error } => {
391				persist_round_failure(wallet, &self.participation, self.movement_id).await
392					.context("failed to persist round failure")?;
393				Ok(RoundStatus::Failed { error: error.clone() })
394			},
395			RoundFlowState::Canceled => {
396				persist_round_failure(wallet, &self.participation, self.movement_id).await
397					.context("failed to persist round failure")?;
398				Ok(RoundStatus::Canceled)
399			},
400
401			RoundFlowState::NonInteractivePending { unlock_hash } => {
402				match progress_delegated(wallet, &self.participation, unlock_hash).await {
403					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
404					Ok(HarkProgressResult::RoundNotFound) => {
405						self.handle_round_not_found(wallet).await
406					},
407					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
408						let funding_txid = funding_tx.compute_txid();
409						self.new_vtxos = new_vtxos;
410						self.flow = RoundFlowState::Finished {
411							funding_tx: funding_tx.clone(),
412							unlock_hash: unlock_hash,
413						};
414
415						persist_round_success(
416							wallet,
417							&self.participation,
418							self.movement_id,
419							&self.new_vtxos,
420							&funding_tx,
421						).await.context("failed to store successful round in DB!")?;
422
423						self.done = true;
424
425						Ok(RoundStatus::Confirmed { funding_txid })
426					},
427					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
428						if let Some(mid) = self.movement_id {
429							update_funding_txid(wallet, mid, funding_txid).await
430								.context("failed to update funding txid in DB")?;
431						}
432						Ok(RoundStatus::Unconfirmed { funding_txid })
433					},
434
435					//TODO(stevenroose) should we mark as failed for these cases?
436
437					Err(HarkForfeitError::Err(e)) => {
438						//TODO(stevenroose) we failed here but we might actualy be able to
439						// succeed if we retry. should we implement some kind of limited
440						// retry after which we mark as failed?
441						Err(e.context("error progressing delegated round"))
442					},
443					Err(HarkForfeitError::SentForfeits(e)) => {
444						self.sent_forfeit_sigs = true;
445						Err(e.context("error progressing delegated round \
446							after sending forfeit tx signatures"))
447					},
448				}
449			},
450			// interactive part finished, but didn't forfeit yet
451			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
452				let funding_txid = funding_tx.compute_txid();
453				let confirmed = check_funding_tx_confirmations(
454					wallet, funding_txid, &funding_tx,
455				).await.context("error checking funding tx confirmations")?;
456				if !confirmed {
457					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
458					return Ok(RoundStatus::Unconfirmed { funding_txid });
459				}
460
461				match hark_vtxo_swap(
462					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
463				).await {
464					Ok(()) => {
465						persist_round_success(
466							wallet,
467							&self.participation,
468							self.movement_id,
469							&self.new_vtxos,
470							&funding_tx,
471						).await.context("failed to store successful round in DB!")?;
472
473						self.done = true;
474
475						Ok(RoundStatus::Confirmed { funding_txid })
476					},
477					Err(HarkForfeitError::Err(e)) => {
478						Err(e.context("error forfeiting VTXOs after round"))
479					},
480					Err(HarkForfeitError::SentForfeits(e)) => {
481						self.sent_forfeit_sigs = true;
482						Err(e.context("error after having signed and sent \
483							forfeit signatures to server"))
484					},
485				}
486			},
487		}
488	}
489
490	/// Once we know the signed round funding tx, this returns the output VTXOs
491	/// for this round.
492	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
493		if self.new_vtxos.is_empty() {
494			None
495		} else {
496			Some(&self.new_vtxos)
497		}
498	}
499
500	/// Returns the input VTXOs that are locked in this round, but only
501	/// if no output VTXOs were issued yet.
502	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
503		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
504		match self.flow {
505			RoundFlowState::NonInteractivePending { .. }
506				| RoundFlowState::InteractivePending
507				| RoundFlowState::InteractiveOngoing { .. }
508			=> {
509				&self.participation.inputs
510			},
511			RoundFlowState::Finished { .. } => if self.done {
512				// inputs already unlocked
513				&[]
514			} else {
515				&self.participation.inputs
516			},
517			RoundFlowState::Failed { .. }
518				| RoundFlowState::Canceled
519			=> {
520				// inputs already unlocked
521				&[]
522			},
523		}
524	}
525
526	/// The balance pending in this round
527	///
528	/// This becomes zero once the new round VTXOs are unlocked.
529	pub fn pending_balance(&self) -> Amount {
530		if self.done {
531			return Amount::ZERO;
532		}
533
534		match self.flow {
535			RoundFlowState::NonInteractivePending { .. }
536				| RoundFlowState::InteractivePending
537				| RoundFlowState::InteractiveOngoing { .. }
538				| RoundFlowState::Finished { .. }
539			=> {
540				self.participation.outputs.iter().map(|o| o.amount).sum()
541			},
542			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
543				Amount::ZERO
544			},
545		}
546	}
547
548	/// Handle the case where the server reports our round participation as not found.
549	///
550	/// If we sent forfeit signatures (which only happens after the round was
551	/// confirmed), this is adversarial — trigger unilateral exit.
552	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
553	/// and verify with the server that they're still considered spendable.
554	async fn handle_round_not_found(
555		&mut self,
556		wallet: &Wallet,
557	) -> anyhow::Result<RoundStatus> {
558		info!("Server reports round participation not found (no forfeits sent)");
559		self.flow = RoundFlowState::Failed {
560			error: "server reports round participation not found".into(),
561		};
562		persist_round_failure(wallet, &self.participation, self.movement_id).await
563			.context("failed to persist round failure")?;
564
565		Ok(RoundStatus::Failed {
566			error: "server reports round participation not found".into(),
567		})
568	}
569}
570
571/// The state of the process flow of a round
572///
573/// This tracks the progress of the interactive part of the round, from
574/// waiting to start until finishing either succesfully or with a failure.
575pub enum RoundFlowState {
576	/// We don't do flow and we just wait for the round to finish
577	NonInteractivePending {
578		unlock_hash: UnlockHash,
579	},
580
581	/// Waiting for round to happen
582	InteractivePending,
583	/// Interactive part ongoing
584	InteractiveOngoing {
585		round_seq: RoundSeq,
586		attempt_seq: usize,
587		state: AttemptState,
588	},
589
590	/// Interactive part finished, waiting for confirmation
591	Finished {
592		funding_tx: Transaction,
593		unlock_hash: UnlockHash,
594	},
595
596	/// Failed during round
597	Failed {
598		error: String,
599	},
600
601	/// User canceled round
602	Canceled,
603}
604
605/// The state of a single round attempt
606///
607/// For each attempt that we participate in, we keep the state of our concrete
608/// participation.
609pub enum AttemptState {
610	AwaitingAttempt,
611	AwaitingUnsignedVtxoTree {
612		cosign_keys: Vec<Keypair>,
613		unlock_hash: UnlockHash,
614	},
615	AwaitingFinishedRound {
616		unsigned_round_tx: Transaction,
617		vtxos_spec: VtxoTreeSpec,
618		unlock_hash: UnlockHash,
619	},
620}
621
622impl AttemptState {
623	/// The state kind represented as a string
624	fn kind(&self) -> &'static str {
625		match self {
626			Self::AwaitingAttempt => "AwaitingAttempt",
627			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
628			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
629		}
630	}
631}
632
633/// Result from trying to progress an ongoing round attempt
634enum AttemptProgressResult {
635	Finished {
636		funding_tx: Transaction,
637		vtxos: Vec<Vtxo<Full>>,
638		unlock_hash: UnlockHash,
639	},
640	Failed(anyhow::Error),
641	/// When the state changes, this variant is returned
642	///
643	/// If during the processing, we have signed any forfeit txs and tried
644	/// sending them to the server, the [UnconfirmedRound] instance is returned
645	/// so that it can be stored in the state.
646	Updated {
647		new_state: AttemptState,
648	},
649	NotUpdated,
650}
651
652/// Participate in the new round attempt by submitting our round participation
653async fn start_attempt(
654	wallet: &Wallet,
655	participation: &RoundParticipation,
656	event: &RoundAttempt,
657) -> anyhow::Result<AttemptState> {
658	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
659
660	// Assign cosign pubkeys to the payment requests.
661	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
662		.take(participation.outputs.len())
663		.collect::<Vec<_>>();
664
665	// Prepare round participation info.
666	// For each of our requested vtxo output, we need a set of public and secret nonces.
667	let cosign_nonces = cosign_keys.iter()
668		.map(|key| {
669			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
670			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
671			for _ in 0..ark_info.nb_round_nonces {
672				let (s, p) = musig::nonce_pair(key);
673				secs.push(s);
674				pubs.push(p);
675			}
676			(secs, pubs)
677		})
678		.take(participation.outputs.len())
679		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
680
681
682	// The round has now started. We can submit our payment.
683	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
684		participation.inputs.len(), participation.outputs.len(),
685	);
686
687	// Build signed requests with mailbox IDs
688	let unblinded_mailbox_id = wallet.mailbox_identifier();
689	let signed_reqs = participation.outputs.iter()
690		.zip(cosign_keys.iter())
691		.zip(cosign_nonces.iter())
692		.map(|((req, cosign_key), (_sec, pub_nonces))| {
693			SignedVtxoRequest {
694				vtxo: req.clone(),
695				cosign_pubkey: cosign_key.public_key(),
696				nonces: pub_nonces.clone(),
697			}
698		})
699		.collect::<Vec<_>>();
700
701	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
702	for vtxo in participation.inputs.iter() {
703		let keypair = wallet.get_vtxo_key(vtxo).await
704			.map_err(HarkForfeitError::Err)?;
705		input_vtxos.push(protos::InputVtxo {
706			vtxo_id: vtxo.id().to_bytes().to_vec(),
707			attestation: {
708				let attestation = RoundAttemptAttestation::new(
709					event.challenge, vtxo.id(), &signed_reqs, &keypair,
710				);
711				attestation.serialize()
712			},
713		});
714	}
715
716	// Register VTXO transaction chains with server before round participation
717	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
718		.map_err(HarkForfeitError::Err)?;
719
720	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
721		input_vtxos: input_vtxos,
722		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
723		#[allow(deprecated)]
724		offboard_requests: vec![],
725		unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
726	}).await.context("Ark server refused our payment submission")?;
727	let unlock_hash = UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?;
728
729	// Stash nonces in memory only. Empty `cosign_keys` means no VTXO
730	// outputs (offboard-only) — nothing to stash.
731	if let Some(k) = cosign_keys.first() {
732		wallet.inner.round_secret_nonces.stash(
733			k.public_key(),
734			cosign_nonces.into_iter().map(|(sec, _pub)| sec).collect(),
735		);
736	}
737
738	Ok(AttemptState::AwaitingUnsignedVtxoTree { unlock_hash, cosign_keys })
739}
740
741/// just an internal type; need Error trait to work with anyhow
742#[derive(Debug, thiserror::Error)]
743enum HarkForfeitError {
744	/// An error happened after we sent forfeit signatures to the server
745	#[error("error after forfeits were sent")]
746	SentForfeits(#[source] anyhow::Error),
747	/// An error happened before we sent forfeit signatures to the server
748	#[error("error before forfeits were sent")]
749	Err(#[source] anyhow::Error),
750}
751
752async fn hark_cosign_leaf(
753	wallet: &Wallet,
754	srv: &mut ServerConnection,
755	funding_tx: &Transaction,
756	vtxo: &mut Vtxo<Full>,
757) -> anyhow::Result<()> {
758	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
759		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
760		.with_context(|| format!(
761			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
762		))?.1;
763	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
764	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
765		protos::LeafVtxoCosignRequest::from(cosign_req),
766	).await
767		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
768		.into_inner().try_into()
769		.context("bad leaf vtxo cosign response")?;
770	ensure!(ctx.finalize(vtxo, cosign_resp),
771		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
772	);
773
774	Ok(())
775}
776
777/// Finish the hArk VTXO swap protocol
778///
779/// This includes:
780/// - requesting cosignature of the locked hArk leaves
781/// - sending forfeit txs to the server in return for the unlock preimage
782///
783/// NB all the actions in this function are idempotent, meaning that the server
784/// allows them to be done multiple times. this means that if this function calls
785/// fails, it's safe to just call it again at a later time
786async fn hark_vtxo_swap(
787	wallet: &Wallet,
788	participation: &RoundParticipation,
789	output_vtxos: &mut [Vtxo<Full>],
790	funding_tx: &Transaction,
791	unlock_hash: UnlockHash,
792) -> Result<(), HarkForfeitError> {
793	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
794
795	// before we start make sure the server has our input vtxo signatures
796	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
797		.context("couldn't send our input vtxo transactions to server")
798		.map_err(HarkForfeitError::Err)?;
799
800	// first get the leaves signed
801	for vtxo in output_vtxos.iter_mut() {
802		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
803			.map_err(HarkForfeitError::Err)?;
804	}
805
806	// then do the forfeit dance
807
808	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
809		unlock_hash: unlock_hash.to_byte_array().to_vec(),
810		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
811	}).await
812		.context("request forfeits nonces call failed")
813		.map_err(HarkForfeitError::Err)?
814		.into_inner().public_nonces.into_iter()
815		.map(|b| musig::PublicNonce::from_bytes(b))
816		.collect::<Result<Vec<_>, _>>()
817		.context("invalid forfeit nonces")
818		.map_err(HarkForfeitError::Err)?;
819
820	if server_nonces.len() != participation.inputs.len() {
821		return Err(HarkForfeitError::Err(anyhow!(
822			"server sent {} nonce pairs, expected {}",
823			server_nonces.len(), participation.inputs.len(),
824		)));
825	}
826
827	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
828	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
829		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
830			.ok().flatten().with_context(|| format!(
831				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
832			)).map_err(HarkForfeitError::Err)?.1;
833		forfeit_bundles.push(HashLockedForfeitBundle::new(
834			input, unlock_hash, &user_key, &nonces,
835		))
836	}
837
838	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
839		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
840	}).await
841		.context("forfeit vtxos call failed")
842		.map_err(HarkForfeitError::SentForfeits)?
843		.into_inner().unlock_preimage.as_slice().try_into()
844		.context("invalid preimage length")
845		.map_err(HarkForfeitError::SentForfeits)?;
846
847	for vtxo in output_vtxos.iter_mut() {
848		if !vtxo.provide_unlock_preimage(preimage) {
849			return Err(HarkForfeitError::SentForfeits(anyhow!(
850				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
851				preimage.as_hex(), vtxo.id(), unlock_hash,
852			)));
853		}
854
855		// then validate the vtxo works
856		vtxo.validate(&funding_tx).with_context(|| format!(
857			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
858		)).map_err(HarkForfeitError::SentForfeits)?;
859	}
860
861	Ok(())
862}
863
864fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
865	match vtxo.validate(funding_tx) {
866		Err(VtxoValidationError::GenesisTransition {
867			genesis_idx, genesis_len, transition_kind, ..
868		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
869		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
870			vtxo.serialize_hex(),
871		)),
872		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
873	}
874}
875
876fn check_round_matches_participation(
877	part: &RoundParticipation,
878	new_vtxos: &[Vtxo<Full>],
879	funding_tx: &Transaction,
880) -> anyhow::Result<()> {
881	ensure!(new_vtxos.len() == part.outputs.len(),
882		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
883	);
884
885	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
886		ensure!(vtxo.amount() == req.amount,
887			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
888		);
889		ensure!(*vtxo.policy() == req.policy,
890			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
891		);
892
893		// We accept the VTXO if only the hArk transition (last) failure happens
894		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
895	}
896
897	Ok(())
898}
899
900/// Check the confirmation status of a funding tx
901///
902/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
903/// The required number of confirmations depends on the wallet's configuration.
904///
905/// Returns false if the funding tx seems valid but not confirmed yet.
906///
907/// Returns an error if the chain source fails or if we can't submit the tx to the
908/// mempool, suggesting it might be double spent.
909async fn check_funding_tx_confirmations(
910	wallet: &Wallet,
911	funding_txid: Txid,
912	funding_tx: &Transaction,
913) -> anyhow::Result<bool> {
914	let tip = wallet.inner.chain.tip().await.context("chain source error")?;
915	let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
916	let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
917	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
918		funding_txid, tx_status, tip,
919	);
920	match tx_status {
921		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
922		TxStatus::Mempool | TxStatus::Confirmed(_) => {
923			if wallet.inner.config.round_tx_required_confirmations == 0 {
924				debug!("Accepting round funding tx without confirmations because of configuration");
925				Ok(true)
926			} else {
927				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
928				Ok(false)
929			}
930		},
931		TxStatus::NotFound => {
932			// let's try to submit it to our mempool
933			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
934			// reliably distinguish the cases of our chain source having issues and the tx
935			// actually being rejected which suggests the round was double-spent
936			if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
937				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
938					funding_txid, serialize_hex(funding_tx), e,
939				))
940			} else {
941				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
942				Ok(false)
943			}
944		},
945	}
946}
947
948enum HarkProgressResult {
949	RoundPending,
950	RoundNotFound,
951	FundingTxUnconfirmed {
952		funding_txid: Txid,
953	},
954	Ok {
955		funding_tx: Transaction,
956		new_vtxos: Vec<Vtxo<Full>>,
957	},
958}
959
960async fn progress_delegated(
961	wallet: &Wallet,
962	participation: &RoundParticipation,
963	unlock_hash: UnlockHash,
964) -> Result<HarkProgressResult, HarkForfeitError> {
965	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
966
967	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
968		unlock_hash: unlock_hash.to_byte_array().to_vec(),
969	}).await {
970		Ok(resp) => resp.into_inner(),
971		Err(err) if err.code() == tonic::Code::NotFound => {
972			return Ok(HarkProgressResult::RoundNotFound);
973		},
974		Err(err) => {
975			return Err(HarkForfeitError::Err(
976				anyhow::Error::from(err).context("error checking round participation status"),
977			));
978		},
979	};
980	let status = protos::RoundParticipationStatus::try_from(resp.status)
981		.context("unknown status from server")
982		.map_err(HarkForfeitError::Err)	?;
983
984	if status == protos::RoundParticipationStatus::RoundPartPending {
985		trace!("Hark round still pending");
986		return Ok(HarkProgressResult::RoundPending);
987	}
988
989	// Since we got here, we clearly don't think we're finished.
990	// So even if the server thinks we did the dance before, we need the
991	// cosignature on the leaf tx so we need to do the dance again.
992	// "Guilty feet have got no rhythm."
993	if status == protos::RoundParticipationStatus::RoundPartReleased {
994		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
995		warn!("Server says preimage was already released for hArk participation \
996			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
997		);
998	}
999
1000	let funding_tx_bytes = resp.round_funding_tx
1001		.context("funding txid should be provided when status is not pending")
1002		.map_err(HarkForfeitError::Err)?;
1003	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
1004		.context("invalid funding txid")
1005		.map_err(HarkForfeitError::Err)?;
1006	let funding_txid = funding_tx.compute_txid();
1007	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
1008		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
1009	);
1010
1011	// Check the confirmation status of the funding tx
1012	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
1013		Ok(true) => {},
1014		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
1015		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
1016	}
1017
1018	let mut new_vtxos = resp.output_vtxos.into_iter()
1019		.map(|v| <Vtxo<Full>>::deserialize(&v))
1020		.collect::<Result<Vec<_>, _>>()
1021		.context("invalid output VTXOs from server")
1022		.map_err(HarkForfeitError::Err)?;
1023
1024	// Check that the vtxos match our participation in the exact order
1025	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1026		.context("new VTXOs received from server don't match our participation")
1027		.map_err(HarkForfeitError::Err)?;
1028
1029	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1030		.context("error forfeiting hArk VTXOs")
1031		.map_err(HarkForfeitError::SentForfeits)?;
1032
1033	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1034}
1035
1036async fn progress_attempt(
1037	state: &mut AttemptState,
1038	wallet: &Wallet,
1039	part: &RoundParticipation,
1040	event: &RoundEvent,
1041) -> AttemptProgressResult {
1042	// we will match only the states and messages required to make progress,
1043	// all else we ignore, except an unexpected finish
1044
1045	match (state, event) {
1046
1047		(
1048			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, unlock_hash },
1049			RoundEvent::VtxoProposal(e),
1050		) => {
1051			trace!("Received VtxoProposal: {:#?}", e);
1052
1053			// Missing nonces means we restarted before signing —
1054			// abandon the attempt rather than reuse on retry.
1055			let secret_nonces = if let Some(first) = cosign_keys.first() {
1056				match wallet.inner.round_secret_nonces.take(&first.public_key()) {
1057					Some(n) => n,
1058					None => return AttemptProgressResult::Failed(anyhow!(
1059						"secret cosign nonces unavailable (likely after a restart); \
1060						 abandoning round attempt to avoid nonce reuse",
1061					)),
1062				}
1063			} else {
1064				vec![]
1065			};
1066
1067			match sign_vtxo_tree(
1068				wallet,
1069				part,
1070				&cosign_keys,
1071				secret_nonces,
1072				&e.unsigned_round_tx,
1073				&e.vtxos_spec,
1074				&e.cosign_agg_nonces,
1075			).await {
1076				Ok(()) => {
1077					AttemptProgressResult::Updated {
1078						new_state: AttemptState::AwaitingFinishedRound {
1079							unsigned_round_tx: e.unsigned_round_tx.clone(),
1080							vtxos_spec: e.vtxos_spec.clone(),
1081							unlock_hash: *unlock_hash,
1082						},
1083					}
1084				},
1085				Err(e) => {
1086					trace!("Error signing VTXO tree: {:#}", e);
1087					AttemptProgressResult::Failed(e)
1088				},
1089			}
1090		},
1091
1092		(
1093			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1094			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1095		) => {
1096			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1097				return AttemptProgressResult::Failed(anyhow!(
1098					"signed funding tx ({}) doesn't match tx received before ({})",
1099					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1100				));
1101			}
1102
1103			if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1104				warn!("Failed to broadcast signed round tx: {:#}", e);
1105			}
1106
1107			match construct_new_vtxos(
1108				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1109			).await {
1110				Ok(v) => AttemptProgressResult::Finished {
1111					funding_tx: signed_round_tx.clone(),
1112					vtxos: v,
1113					unlock_hash: *unlock_hash,
1114				},
1115				Err(e) => AttemptProgressResult::Failed(anyhow!(
1116					"failed to construct new VTXOs for round: {:#}", e,
1117				)),
1118			}
1119		},
1120
1121		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1122			AttemptProgressResult::Failed(anyhow!(
1123				"unexpectedly received a finished round while we were in state {}",
1124				state.kind(),
1125			))
1126		},
1127
1128		(state, _) => {
1129			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1130			AttemptProgressResult::NotUpdated
1131		},
1132	}
1133}
1134
1135async fn sign_vtxo_tree(
1136	wallet: &Wallet,
1137	participation: &RoundParticipation,
1138	cosign_keys: &[Keypair],
1139	secret_nonces: Vec<Vec<SecretNonce>>,
1140	unsigned_round_tx: &Transaction,
1141	vtxo_tree: &VtxoTreeSpec,
1142	cosign_agg_nonces: &[musig::AggregatedNonce],
1143) -> anyhow::Result<()> {
1144	let (mut srv, _) = wallet.require_server().await.context("server not available")?;
1145
1146	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1147
1148	// Check that the proposal contains our inputs.
1149	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1150	for vtxo_req in vtxo_tree.iter_vtxos() {
1151		if let Some(i) = my_vtxos.iter().position(|v| {
1152			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1153		}) {
1154			my_vtxos.swap_remove(i);
1155		}
1156	}
1157	if !my_vtxos.is_empty() {
1158		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1159	}
1160
1161	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1162	trace!("Sending vtxo signatures to server...");
1163	// Sequential: SecretNonce is consume-once and not Clone, so we move
1164	// one Vec<SecretNonce> into cosign_branch per output. Going parallel
1165	// would require sharing the server connection across futures, which
1166	// isn't worth the complexity for a per-output RPC.
1167	for ((req, key), sec) in participation.outputs.iter().zip(cosign_keys).zip(secret_nonces) {
1168		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1169		let part_sigs = unsigned_vtxos.cosign_branch(
1170			&cosign_agg_nonces, leaf_idx, key, sec,
1171		).context("failed to cosign branch: our request not part of tree")?;
1172
1173		info!("Sending {} partial vtxo cosign signatures for pk {}",
1174			part_sigs.len(), key.public_key(),
1175		);
1176
1177		srv.client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1178			pubkey: key.public_key().serialize().to_vec(),
1179			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1180		}).await.context("error sending vtxo signatures")?;
1181	}
1182	trace!("Done sending vtxo signatures to server");
1183
1184	Ok(())
1185}
1186
1187async fn construct_new_vtxos(
1188	participation: &RoundParticipation,
1189	unsigned_round_tx: &Transaction,
1190	vtxo_tree: &VtxoTreeSpec,
1191	vtxo_cosign_sigs: &[schnorr::Signature],
1192) -> anyhow::Result<Vec<Vtxo<Full>>> {
1193	let round_txid = unsigned_round_tx.compute_txid();
1194	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1195	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1196
1197	// Validate the vtxo tree and cosign signatures.
1198	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1199		// bad server!
1200		bail!("Received incorrect vtxo cosign signatures from server");
1201	}
1202
1203	let signed_vtxos = vtxo_tree
1204		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1205		.into_cached_tree();
1206
1207	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1208	let total_nb_expected_vtxos = expected_vtxos.len();
1209
1210	let mut new_vtxos = vec![];
1211	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1212		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1213			let vtxo = signed_vtxos.build_vtxo(idx);
1214
1215			// validate the received vtxos
1216			// This is more like a sanity check since we crafted them ourselves.
1217			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1218				.context("constructed invalid vtxo from tree")?;
1219
1220			info!("New VTXO from round: {} ({}, {})",
1221				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1222			);
1223
1224			new_vtxos.push(vtxo);
1225			expected_vtxos.swap_remove(expected_idx);
1226		}
1227	}
1228
1229	if !expected_vtxos.is_empty() {
1230		if expected_vtxos.len() == total_nb_expected_vtxos {
1231			// we must have done something wrong
1232			bail!("None of our VTXOs were present in round!");
1233		} else {
1234			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1235				expected_vtxos.len(), expected_vtxos,
1236			);
1237		}
1238	}
1239	Ok(new_vtxos)
1240}
1241
1242//TODO(stevenroose) should be made idempotent
1243async fn persist_round_success(
1244	wallet: &Wallet,
1245	participation: &RoundParticipation,
1246	movement_id: Option<MovementId>,
1247	new_vtxos: &[Vtxo<Full>],
1248	funding_tx: &Transaction,
1249) -> anyhow::Result<()> {
1250	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1251		new_vtxos.len(), movement_id,
1252	);
1253
1254	// we first try all actions that need to happen and only afterwards return errors
1255	// so that we achieve maximum success
1256
1257	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1258		.context("failed to store new VTXOs");
1259	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1260		.context("failed to mark input VTXOs as spent");
1261	let update_result = if let Some(mid) = movement_id {
1262		wallet.inner.movements.finish_movement_with_update(
1263			mid,
1264			MovementStatus::Successful,
1265			MovementUpdate::new()
1266				.produced_vtxos(new_vtxos)
1267				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1268		).await.context("failed to mark movement as finished")
1269	} else {
1270		Ok(())
1271	};
1272
1273	store_result?;
1274	spent_result?;
1275	update_result?;
1276
1277	Ok(())
1278}
1279
1280async fn persist_round_failure(
1281	wallet: &Wallet,
1282	participation: &RoundParticipation,
1283	movement_id: Option<MovementId>,
1284) -> anyhow::Result<()> {
1285	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1286	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1287	let finish_result = if let Some(movement_id) = movement_id {
1288		wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1289	} else {
1290		Ok(())
1291	};
1292	if let Err(e) = &finish_result {
1293		error!("Failed to mark movement as failed: {:#}", e);
1294	}
1295	match (unlock_result, finish_result) {
1296		(Ok(()), Ok(())) => Ok(()),
1297		(Err(e), _) => Err(e),
1298		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1299	}
1300}
1301
1302async fn update_funding_txid(
1303	wallet: &Wallet,
1304	movement_id: MovementId,
1305	funding_txid: Txid,
1306) -> anyhow::Result<()> {
1307	wallet.inner.movements.update_movement(
1308		movement_id,
1309		MovementUpdate::new()
1310			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1311	).await.context("Unable to update funding txid of round")
1312}
1313
1314/// In-memory store for MuSig2 secret cosign nonces used during round
1315/// signing. Entries are keyed by the first cosign pubkey of each round
1316/// attempt — that pubkey is freshly generated in [`start_attempt`],
1317/// uniquely identifies the attempt within a process, and is reachable
1318/// from the persisted `AttemptState::AwaitingUnsignedVtxoTree`.
1319///
1320/// Nonces never touch disk: persisting them risks signing twice with
1321/// the same nonce, which is unsafe with MuSig2.
1322#[derive(Default)]
1323pub struct RoundSecretNonces {
1324	inner: parking_lot::Mutex<HashMap<bitcoin::secp256k1::PublicKey, Vec<Vec<SecretNonce>>>>,
1325}
1326
1327impl RoundSecretNonces {
1328	pub fn new() -> Self {
1329		Self { inner: parking_lot::Mutex::new(HashMap::new()) }
1330	}
1331
1332	/// Insert nonces under the given key, replacing any previous entry.
1333	pub fn stash(
1334		&self,
1335		first_cosign_pubkey: bitcoin::secp256k1::PublicKey,
1336		nonces: Vec<Vec<SecretNonce>>,
1337	) {
1338		self.inner.lock().insert(first_cosign_pubkey, nonces);
1339	}
1340
1341	/// Remove and return the nonces stashed under the given key.
1342	/// `None` after a process restart or if the entry was never stashed.
1343	pub fn take(
1344		&self,
1345		first_cosign_pubkey: &bitcoin::secp256k1::PublicKey,
1346	) -> Option<Vec<Vec<SecretNonce>>> {
1347		self.inner.lock().remove(first_cosign_pubkey)
1348	}
1349
1350	/// Drop the entry under the given key without returning its
1351	/// contents. Use when a stashed attempt is being replaced by a new
1352	/// one and its key would otherwise be unreachable.
1353	pub fn forget(&self, first_cosign_pubkey: &bitcoin::secp256k1::PublicKey) {
1354		self.inner.lock().remove(first_cosign_pubkey);
1355	}
1356}
1357
1358impl Wallet {
1359	/// Load and lock a single given round state (by id), waiting for the lock.
1360	///
1361	/// Returns `Some(state)` if the round state is found and locked, `None`
1362	/// if it is not found after acquiring the lock.
1363	pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1364		let guard = self.inner.lock_manager.lock(
1365			&format!("{}.round.{}", self.fingerprint(), id),
1366			ROUND_LOCK_TIMEOUT,
1367		).await.with_context(|| format!(
1368			"timed out waiting for lock on round state {} (wallet {})",
1369			id, self.fingerprint(),
1370		))?;
1371
1372		if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1373			return Ok(Some(state.lock(guard)));
1374		}
1375
1376		Ok(None)
1377	}
1378
1379	/// Ask the server when the next round is scheduled to start
1380	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1381		let (mut srv, _) = self.require_server().await?;
1382		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1383		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1384	}
1385
1386	/// Start a new round participation
1387	///
1388	/// This function will store the state in the db and mark the VTXOs as locked.
1389	///
1390	/// ### Return
1391	///
1392	/// - By default, the returned state will be locked to prevent race conditions.
1393	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1394	pub async fn join_next_round(
1395		&self,
1396		participation: RoundParticipation,
1397		movement_kind: Option<RoundMovement>,
1398	) -> anyhow::Result<StoredRoundState> {
1399		let movement = if let Some(kind) = movement_kind {
1400			Some(self.inner.movements.new_guarded_movement_with_update(
1401				Subsystem::ROUND,
1402				kind.to_string(),
1403				OnDropStatus::Failed,
1404				participation.to_movement_update()?
1405			).await?)
1406		} else {
1407			None
1408		};
1409		let movement_id = movement.as_ref().map(|m| m.id());
1410		let input_vtxos = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1411		let state = RoundState::new_interactive(participation, movement_id);
1412
1413		self.lock_vtxos(&input_vtxos, movement_id.map(|m| m.into())).await
1414			.context("failed to lock input VTXOs")?;
1415
1416		match (async || {
1417			let id = self.inner.db.store_round_state(&state).await?;
1418			Ok(self.lock_wait_round_state(id).await?
1419				.context("failed to lock fresh round state")?)
1420		})().await {
1421			Ok(state) => {
1422				if let Some(mut m) = movement {
1423					m.stop();
1424				}
1425				Ok(state)
1426			},
1427			Err(e) => {
1428				self.unlock_vtxos(&input_vtxos).await
1429					.context("failed to unlock input VTXOs")?;
1430				if let Some(mut m) = movement {
1431					m.fail().await.context("failed to mark movement as failed")?;
1432				}
1433				Err(e)
1434			},
1435		}
1436	}
1437
1438	/// Join next round in delegated mode
1439	pub async fn join_next_round_delegated(
1440		&self,
1441		participation: RoundParticipation,
1442		movement_kind: Option<RoundMovement>,
1443	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1444		let movement = if let Some(kind) = movement_kind {
1445			Some(self.inner.movements.new_guarded_movement_with_update(
1446				Subsystem::ROUND,
1447				kind.to_string(),
1448				OnDropStatus::Failed,
1449				participation.to_movement_update()?,
1450			).await?)
1451		} else {
1452			None
1453		};
1454		let movement_id = movement.as_ref().map(|m| m.id());
1455
1456		let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1457		self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1458			.context("error locking input VTXOs")?;
1459
1460		match self.join_next_round_delegated_inner(participation, movement_id).await {
1461			Ok(state) => {
1462				if let Some(mut m) = movement {
1463					m.stop();
1464				}
1465				Ok(state)
1466			},
1467			Err(e) => {
1468				self.unlock_vtxos(&input_ids).await
1469					.context("error unlocking input VTXOs")?;
1470				if let Some(mut m) = movement {
1471					m.fail().await.context("error marking movement as failed")?;
1472				}
1473				Err(e)
1474			},
1475		}
1476	}
1477
1478	async fn join_next_round_delegated_inner(
1479		&self,
1480		participation: RoundParticipation,
1481		movement_id: Option<MovementId>,
1482	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1483		let (mut srv, _) = self.require_server().await?;
1484
1485		// Get mailbox identifier for VTXO delivery
1486		let unblinded_mailbox_id = self.mailbox_identifier();
1487
1488		// Register VTXO transaction chains with server before round participation
1489		self.register_vtxo_transactions_with_server(&participation.inputs).await
1490			.context("failed to register input vtxo transactions with server")?;
1491
1492		// Generate attestations for input vtxos
1493		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1494		for vtxo in participation.inputs.iter() {
1495			let keypair = self.get_vtxo_key(vtxo).await
1496				.context("failed to get vtxo keypair")?;
1497			input_vtxos.push(protos::InputVtxo {
1498				vtxo_id: vtxo.id().to_bytes().to_vec(),
1499				attestation: {
1500					let attestation = DelegatedRoundParticipationAttestation::new(
1501						vtxo.id(), &participation.outputs, &keypair,
1502					);
1503					attestation.serialize()
1504				},
1505			});
1506		}
1507
1508		// Build proto VtxoRequests
1509		let vtxo_requests = participation.outputs.iter()
1510			.map(|req|
1511				protos::VtxoRequest {
1512					policy: req.policy.serialize(),
1513					amount: req.amount.to_sat(),
1514			})
1515			.collect::<Vec<_>>();
1516
1517		// Submit participation to server and get unlock_hash
1518		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1519			input_vtxos,
1520			vtxo_requests,
1521			unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1522		}).await.context("error submitting round participation to server")?.into_inner();
1523
1524		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1525			.context("invalid unlock hash from server")?;
1526
1527		let state = RoundState::new_delegated(participation, unlock_hash, movement_id);
1528
1529		info!("Delegated round participation submitted, it will automatically execute \
1530			when you next sync your wallet after the round happened \
1531			(and has sufficient confirmations).",
1532		);
1533
1534		let id = self.inner.db.store_round_state(&state).await?;
1535		Ok(StoredRoundState::new(id, state))
1536	}
1537
1538	/// Join an already-started round attempt interactively, submitting our
1539	/// participation synchronously.
1540	///
1541	/// Unlike [Wallet::join_next_round] — which stores a pending participation
1542	/// and waits for a round to start before submitting inside the round state
1543	/// machine — this submits to the in-flight `attempt` right away. This allows
1544	/// us to react to any unspendable VTXOs and exclude them from the refresh.
1545	pub(crate) async fn join_attempt_interactive(
1546		&self,
1547		participation: RoundParticipation,
1548		attempt: &RoundAttempt,
1549		movement_kind: Option<RoundMovement>,
1550	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1551		let movement = if let Some(kind) = movement_kind {
1552			Some(self.inner.movements.new_guarded_movement_with_update(
1553				Subsystem::ROUND,
1554				kind.to_string(),
1555				OnDropStatus::Failed,
1556				participation.to_movement_update()?,
1557			).await?)
1558		} else {
1559			None
1560		};
1561		let movement_id = movement.as_ref().map(|m| m.id());
1562
1563		let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1564		self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1565			.context("error locking input VTXOs")?;
1566
1567		match self.join_attempt_interactive_inner(participation, attempt, movement_id).await {
1568			Ok(state) => {
1569				if let Some(mut m) = movement {
1570					m.stop();
1571				}
1572				Ok(state)
1573			},
1574			Err(e) => {
1575				self.unlock_vtxos(&input_ids).await
1576					.context("error unlocking input VTXOs")?;
1577				if let Some(mut m) = movement {
1578					m.fail().await.context("error marking movement as failed")?;
1579				}
1580				Err(e)
1581			},
1582		}
1583	}
1584
1585	async fn join_attempt_interactive_inner(
1586		&self,
1587		participation: RoundParticipation,
1588		attempt: &RoundAttempt,
1589		movement_id: Option<MovementId>,
1590	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1591		// Submit synchronously to the in-flight attempt. On rejection the
1592		// tonic::Status (carrying the unusable input ids in its `identifiers`
1593		// metadata) propagates up the error chain untouched.
1594		let attempt_state = start_attempt(self, &participation, attempt).await?;
1595
1596		let mut state = RoundState::new_interactive(participation, movement_id);
1597		state.flow = RoundFlowState::InteractiveOngoing {
1598			round_seq: attempt.round_seq,
1599			attempt_seq: attempt.attempt_seq,
1600			state: attempt_state,
1601		};
1602
1603		let id = self.inner.db.store_round_state(&state).await?;
1604		Ok(StoredRoundState::new(id, state))
1605	}
1606
1607	/// Get all pending round states
1608	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1609		self.inner.db.get_pending_round_state_ids().await
1610	}
1611
1612	/// Get all pending round states
1613	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1614		let ids = self.inner.db.get_pending_round_state_ids().await?;
1615		let mut states = Vec::with_capacity(ids.len());
1616		for id in ids {
1617			if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1618				states.push(state);
1619			}
1620		}
1621		Ok(states)
1622	}
1623
1624	/// Balance locked in pending rounds
1625	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1626		let mut ret = Amount::ZERO;
1627		for round in self.pending_round_states().await? {
1628			ret += round.state().pending_balance();
1629		}
1630		Ok(ret)
1631	}
1632
1633	/// Returns all VTXOs that are locked in a pending round
1634	///
1635	/// This excludes all input VTXOs for which the output VTXOs have already
1636	/// been created.
1637	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1638		let mut ret = Vec::new();
1639		for round in self.pending_round_states().await? {
1640			let inputs = round.state().locked_pending_inputs();
1641			ret.reserve(inputs.len());
1642			for input in inputs {
1643				let v = self.get_vtxo_by_id(input.id()).await
1644					.context("unknown round input VTXO")?;
1645				ret.push(v);
1646			}
1647		}
1648		Ok(ret)
1649	}
1650
1651	/// Sync pending rounds that have finished but are waiting for confirmations
1652	pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1653		let states = self.pending_round_states().await?;
1654		if states.is_empty() {
1655			return Ok(HashMap::new());
1656		}
1657
1658		debug!("Syncing {} pending round states...", states.len());
1659
1660		let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1661		tokio_stream::iter(states).for_each_concurrent(10, |state| {
1662			let ret = ret.clone();
1663			async move {
1664				// not processing events here
1665				if state.state().ongoing_participation() {
1666					return;
1667				}
1668
1669				let mut state = match self.lock_wait_round_state(state.id()).await {
1670					Ok(Some(state)) => state,
1671					Ok(None) => return,
1672					Err(e) => {
1673						warn!("Error locking round state: {:#}", e);
1674						return;
1675					},
1676				};
1677
1678				let status = match state.state_mut().sync(self).await {
1679					Ok(s) => s,
1680					Err(e) => {
1681						warn!("Error syncing round: {:#}", e);
1682						return;
1683					},
1684				};
1685				trace!("Synced round #{}, status: {:?}", state.id(), status);
1686				match status {
1687					RoundStatus::Confirmed { funding_txid } => {
1688						info!("Round confirmed. Funding tx {}", funding_txid);
1689						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1690							warn!("Error removing confirmed round state from db: {:#}", e);
1691						}
1692					},
1693					RoundStatus::Unconfirmed { funding_txid } => {
1694						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1695						if let Err(e) = self.inner.db.update_round_state(&state).await {
1696							warn!("Error updating pending round state in db: {:#}", e);
1697						}
1698					},
1699					RoundStatus::Pending => {
1700						if let Err(e) = self.inner.db.update_round_state(&state).await {
1701							warn!("Error updating pending round state in db: {:#}", e);
1702						}
1703					},
1704					RoundStatus::Failed { ref error } => {
1705						error!("Round failed: {}", error);
1706						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1707							warn!("Error removing failed round state from db: {:#}", e);
1708						}
1709					},
1710					RoundStatus::Canceled => {
1711						error!("Round canceled");
1712						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1713							warn!("Error removing canceled round state from db: {:#}", e);
1714						}
1715					},
1716				}
1717				ret.lock().insert(state.id(), status);
1718			}
1719		}).await;
1720
1721		Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1722	}
1723
1724	/// Fetch last round event from server
1725	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1726		let (mut srv, _) = self.require_server().await?;
1727		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1728		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1729	}
1730
1731	async fn inner_process_event(
1732		&self,
1733		state: &mut StoredRoundState,
1734		event: Option<&RoundEvent>,
1735	) {
1736		if let Some(event) = event && state.state().ongoing_participation() {
1737			let updated = state.state_mut().process_event(self, &event).await;
1738			if updated {
1739				if let Err(e) = self.inner.db.update_round_state(&state).await {
1740					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1741				}
1742			}
1743		}
1744
1745		match state.state_mut().sync(self).await {
1746			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1747			Ok(s) if s.is_final() => {
1748				info!("Round #{} finished with result: {:?}", state.id(), s);
1749				if let Err(e) = self.inner.db.remove_round_state(&state).await {
1750					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1751				}
1752			},
1753			Ok(s) => {
1754				trace!("Round state #{} is now in state {:?}", state.id(), s);
1755				if let Err(e) = self.inner.db.update_round_state(&state).await {
1756					warn!("Error storing round state #{}: {:#}", state.id(), e);
1757				}
1758			},
1759		}
1760	}
1761
1762	/// Try to make incremental progress on all pending round states
1763	///
1764	/// If the `last_round_event` argument is not provided, it will be fetched
1765	/// from the server.
1766	pub async fn progress_pending_rounds(
1767		&self,
1768		last_round_event: Option<&RoundEvent>,
1769	) -> anyhow::Result<()> {
1770		let states = self.pending_round_states().await?;
1771		if states.is_empty() {
1772			return Ok(());
1773		}
1774
1775		info!("Processing {} rounds...", states.len());
1776
1777		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1778
1779		let has_ongoing_participation = states.iter()
1780			.any(|s| s.state().ongoing_participation());
1781		if has_ongoing_participation && last_round_event.is_none() {
1782			match self.get_last_round_event().await {
1783				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1784				Err(e) => {
1785					warn!("Error fetching round event, \
1786						failed to progress ongoing rounds: {:#}", e);
1787				},
1788			}
1789		}
1790
1791		let event = last_round_event.as_ref().map(|c| c.as_ref());
1792
1793		let futs = states.into_iter().map(async |state| {
1794			let locked = self.lock_wait_round_state(state.id()).await?;
1795			if let Some(mut locked) = locked {
1796				self.inner_process_event(&mut locked, event).await;
1797			}
1798			Ok::<_, anyhow::Error>(())
1799		});
1800
1801		futures::future::join_all(futs).await;
1802
1803		Ok(())
1804	}
1805
1806	pub async fn subscribe_round_events(&self)
1807		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1808	{
1809		let (mut srv, _) = self.require_server().await?;
1810		let mut req = tonic::IntoRequest::into_request(protos::Empty {});
1811		req.set_timeout(SUBSCRIBE_REQUEST_TIMEOUT);
1812		let events = srv.client.subscribe_rounds(req).await?
1813			.into_inner().map(|m| {
1814				let m = m.context("received error on event stream")?;
1815				let e = RoundEvent::try_from(m.clone())
1816					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1817				trace!("Received round event: {}", e);
1818				Ok::<_, anyhow::Error>(e)
1819			});
1820		Ok(events)
1821	}
1822
1823	/// A blocking call that will try to perform a full round participation
1824	/// for all ongoing rounds
1825	///
1826	/// Returns only once there is no ongoing rounds anymore.
1827	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1828		let mut events = self.subscribe_round_events().await?;
1829
1830		loop {
1831			// NB: we need to load all ongoing rounds on every iteration here
1832			// because some might be finished by another call
1833			let state_ids = self.pending_round_states().await?.iter()
1834				.filter(|s| s.state().ongoing_participation())
1835				.map(|s| s.id())
1836				.collect::<Vec<_>>();
1837
1838			if state_ids.is_empty() {
1839				info!("All rounds handled");
1840				return Ok(());
1841			}
1842
1843			let event = events.next().await
1844				.context("events stream broke")?
1845				.context("error on event stream")?;
1846
1847			let futs = state_ids.into_iter().map(async |state| {
1848				let locked = self.lock_wait_round_state(state).await?;
1849				if let Some(mut locked) = locked {
1850					self.inner_process_event(&mut locked, Some(&event)).await;
1851				}
1852				Ok::<_, anyhow::Error>(())
1853			});
1854
1855			futures::future::join_all(futs).await;
1856		}
1857	}
1858
1859	/// Will cancel all pending rounds that can safely be canceled
1860	///
1861	/// All rounds that have not started yet can safely be canceled,
1862	/// as well as rounds where we have not yet signed any forfeit txs.
1863	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1864		// initial load to get all pending round states ids
1865		let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1866
1867		let futures = state_ids.into_iter().map(|state_id| {
1868			async move {
1869				// wait for lock and load again to ensure most recent state
1870				let mut state = match self.lock_wait_round_state(state_id).await {
1871					Ok(Some(s)) => s,
1872					Ok(None) => return,
1873					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1874				};
1875
1876				match state.state_mut().try_cancel(self).await {
1877					Ok(true) => {
1878						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1879							warn!("Error removing canceled round state from db: {:#}", e);
1880						}
1881					},
1882					Ok(false) => {},
1883					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1884				}
1885			}
1886		});
1887
1888		join_all(futures).await;
1889
1890		Ok(())
1891	}
1892
1893	/// Try to cancel the given round
1894	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1895		let mut state = self.lock_wait_round_state(id).await?
1896			.context("round state not found")?;
1897
1898		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1899			self.inner.db.remove_round_state(&state).await
1900				.context("error removing canceled round state from db")?;
1901		} else {
1902			bail!("failed to cancel round");
1903		}
1904
1905		Ok(())
1906	}
1907
1908	/// Participate in a round
1909	///
1910	/// This function will start a new round participation and block until
1911	/// the round is finished.
1912	/// After this method returns the round state will be kept active until
1913	/// the round tx fully confirms.
1914	pub(crate) async fn participate_round(
1915		&self,
1916		participation: RoundParticipation,
1917		movement_kind: Option<RoundMovement>,
1918	) -> anyhow::Result<RoundStatus> {
1919		let state = self.join_next_round(participation, movement_kind).await?;
1920
1921		info!("Waiting for a round start...");
1922		let mut events = self.subscribe_round_events().await?;
1923
1924		self.drive_round_state(state, &mut events).await
1925	}
1926
1927	/// Drive an already-joined round state to its final [RoundStatus], blocking
1928	/// on `events` and persisting each update.
1929	///
1930	/// Shared by [Wallet::participate_round] and the blocking maintenance
1931	/// refresh: the latter submits its participation up-front (against an
1932	/// in-flight attempt) and then drives the resulting round to completion
1933	/// here.
1934	pub(crate) async fn drive_round_state<S>(
1935		&self,
1936		mut state: StoredRoundState,
1937		events: &mut S,
1938	) -> anyhow::Result<RoundStatus>
1939	where
1940		S: Stream<Item = anyhow::Result<RoundEvent>> + Unpin,
1941	{
1942		loop {
1943			if !state.state().ongoing_participation() {
1944				let status = state.state_mut().sync(self).await?;
1945				match status {
1946					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1947					RoundStatus::Canceled => bail!("round canceled"),
1948					status => return Ok(status),
1949				}
1950			}
1951
1952			let event = events.next().await
1953				.context("events stream broke")?
1954				.context("error on event stream")?;
1955			if state.state_mut().process_event(self, &event).await {
1956				self.inner.db.update_round_state(&state).await?;
1957			}
1958		}
1959	}
1960}
1961
1962#[cfg(test)]
1963mod test {
1964	use super::*;
1965
1966	use bitcoin::secp256k1::Secp256k1;
1967
1968	fn pubkey() -> bitcoin::secp256k1::PublicKey {
1969		let secp = Secp256k1::new();
1970		Keypair::new(&secp, &mut rand::thread_rng()).public_key()
1971	}
1972
1973	fn nonces() -> Vec<Vec<SecretNonce>> {
1974		let secp = Secp256k1::new();
1975		let key = Keypair::new(&secp, &mut rand::thread_rng());
1976		// Shape mirrors what start_attempt produces: outer Vec is one
1977		// entry per cosign keypair, inner Vec is the tree-depth set of
1978		// pre-generated nonces.
1979		vec![vec![musig::nonce_pair(&key).0, musig::nonce_pair(&key).0]]
1980	}
1981
1982	#[test]
1983	fn stash_and_take() {
1984		let store = RoundSecretNonces::new();
1985		let k = pubkey();
1986		store.stash(k, nonces());
1987
1988		assert!(store.take(&k).is_some());
1989	}
1990
1991	#[test]
1992	fn cannot_take_twice() {
1993		let store = RoundSecretNonces::new();
1994		let k = pubkey();
1995		store.stash(k, nonces());
1996
1997		assert!(store.take(&k).is_some());
1998		assert!(store.take(&k).is_none());
1999	}
2000
2001	#[test]
2002	fn cannot_take_after_forget() {
2003		let store = RoundSecretNonces::new();
2004		let k = pubkey();
2005		store.stash(k, nonces());
2006		store.forget(&k);
2007
2008		assert!(store.take(&k).is_none());
2009	}
2010
2011	#[test]
2012	fn stash_overrides_stash() {
2013		let secp = Secp256k1::new();
2014		let key = Keypair::new(&secp, &mut rand::thread_rng());
2015		let nonces_1 = vec![vec![musig::nonce_pair(&key).0]];
2016		let nonces_2 = vec![];
2017
2018		let store = RoundSecretNonces::new();
2019		store.stash(key.public_key(), nonces_1);
2020		store.stash(key.public_key(), nonces_2);
2021
2022		let taken = store.take(&key.public_key()).expect("nonces present");
2023		assert_eq!(taken.len(), 0);
2024	}
2025}