Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::join_all;
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::movement::manager::OnDropStatus;
37use crate::{SECP, Wallet, WalletVtxo};
38use crate::movement::{MovementId, MovementStatus};
39use crate::movement::update::MovementUpdate;
40use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
41
42/// How long [`Wallet::lock_wait_round_state`] waits for a contended
43/// round lock before giving up. Long enough to outlast a normal round.
44const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
45use crate::subsystem::{RoundMovement, Subsystem};
46
47
48/// The type string for the hArk leaf transition
49const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
50
51/// Struct to communicate your specific participation for an Ark round.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RoundParticipation {
54	#[serde(with = "ark::encode::serde::vec")]
55	pub inputs: Vec<Vtxo<Full>>,
56	/// The output VTXOs that we request in the round,
57	/// including change
58	pub outputs: Vec<VtxoRequest>,
59	/// Optional mailbox identifier for round completion notification
60	#[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
61	pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
62}
63
64impl RoundParticipation {
65	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
66		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
67		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
68		let fee = input_amount - output_amount;
69		Ok(MovementUpdate::new()
70			.consumed_vtxos(&self.inputs)
71			.intended_balance(SignedAmount::ZERO)
72			.effective_balance( - fee.to_signed()?)
73			.fee(fee)
74		)
75	}
76}
77
78#[derive(Debug, Clone)]
79pub enum RoundStatus {
80	/// The round was successful and is fully confirmed
81	Confirmed {
82		funding_txid: Txid,
83	},
84	/// Round successful but not fully confirmed
85	Unconfirmed {
86		funding_txid: Txid,
87	},
88	/// Round didn't finish yet
89	Pending,
90	/// The round failed
91	Failed {
92		error: String,
93	},
94	/// User canceled the round
95	Canceled,
96}
97
98impl RoundStatus {
99	/// Whether this is the final state and it won't change anymore
100	pub fn is_final(&self) -> bool {
101		match self {
102			Self::Confirmed { .. } => true,
103			Self::Unconfirmed { .. } => false,
104			Self::Pending => false,
105			Self::Failed { .. } => true,
106			Self::Canceled => true,
107		}
108	}
109
110	/// Whether it looks like the round succeeded
111	pub fn is_success(&self) -> bool {
112		match self {
113			Self::Confirmed { .. } => true,
114			Self::Unconfirmed { .. } => true,
115			Self::Pending => false,
116			Self::Failed { .. } => false,
117			Self::Canceled => false,
118		}
119	}
120}
121
122/// State of the progress of a round participation
123///
124/// An instance of this struct is kept all the way from the intention of joining
125/// the next round, until either the round fully confirms or it fails and we are
126/// sure it won't have any effect on our wallet.
127///
128/// As soon as we have signed forfeit txs for the round, we keep track of this
129/// round attempt until we see another attempt we participated in confirm or
130/// we gain confidence that the failed attempt will never confirm.
131//
132//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
133// to have better control. this way we can touch db before we sent forfeit sigs
134pub struct RoundState {
135	/// Round is fully done
136	pub(crate) done: bool,
137
138	/// Our participation in this round
139	pub(crate) participation: RoundParticipation,
140
141	/// The flow of the round in case it is still ongoing with the server
142	pub(crate) flow: RoundFlowState,
143
144	/// The new output vtxos of this round participation
145	///
146	/// After we finish the interactive part, we fill this with the uncompleted
147	/// VTXOs which we then try to complete with the unlock preimage.
148	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
149
150	/// Whether we sent our forfeit signatures to the server
151	///
152	/// If we did this and the server refused to reveal our new VTXOs,
153	/// we will be forced to exit.
154	//TODO(stevenroose) implement exit when this is true and we can't make progress
155	// probably based on the input vtxos becoming close to expiry
156	pub(crate) sent_forfeit_sigs: bool,
157
158	/// The ID of the [Movement] associated with this round
159	pub(crate) movement_id: Option<MovementId>,
160}
161
162impl RoundState {
163	fn new_interactive(
164		participation: RoundParticipation,
165		movement_id: Option<MovementId>,
166	) -> Self {
167		Self {
168			participation,
169			movement_id,
170			flow: RoundFlowState::InteractivePending,
171			new_vtxos: Vec::new(),
172			sent_forfeit_sigs: false,
173			done: false,
174		}
175	}
176
177	fn new_delegated(
178		participation: RoundParticipation,
179		unlock_hash: UnlockHash,
180		movement_id: Option<MovementId>,
181	) -> Self {
182		Self {
183			participation,
184			movement_id,
185			flow: RoundFlowState::NonInteractivePending { unlock_hash },
186			new_vtxos: Vec::new(),
187			sent_forfeit_sigs: false,
188			done: false,
189		}
190	}
191
192	/// Our participation in this round
193	pub fn participation(&self) -> &RoundParticipation {
194		&self.participation
195	}
196
197	/// the unlock hash if already known
198	pub fn unlock_hash(&self) -> Option<UnlockHash> {
199		match self.flow {
200			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201			RoundFlowState::InteractivePending => None,
202			RoundFlowState::InteractiveOngoing { .. } => None,
203			RoundFlowState::Failed { .. } => None,
204			RoundFlowState::Canceled => None,
205			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206		}
207	}
208
209	pub fn funding_tx(&self) -> Option<&Transaction> {
210		match self.flow {
211			RoundFlowState::NonInteractivePending { .. } => None,
212			RoundFlowState::InteractivePending => None,
213			RoundFlowState::InteractiveOngoing { .. } => None,
214			RoundFlowState::Failed { .. } => None,
215			RoundFlowState::Canceled => None,
216			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217		}
218	}
219
220	/// Whether the interactive part of the round is still ongoing
221	pub fn ongoing_participation(&self) -> bool {
222		match self.flow {
223			RoundFlowState::NonInteractivePending { .. } => false,
224			RoundFlowState::InteractivePending => true,
225			RoundFlowState::InteractiveOngoing { .. } => true,
226			RoundFlowState::Failed { .. } => false,
227			RoundFlowState::Canceled => false,
228			RoundFlowState::Finished { .. } => false,
229		}
230	}
231
232	/// Tries to cancel the round and returns whether it was succesfully canceled
233	/// or if it was already canceled or failed
234	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235		let ret = match self.flow {
236			RoundFlowState::NonInteractivePending { .. } => {
237				//TODO(stevenroose) we have to cancel with server
238				bail!("it is currently not yet possible to cancel pending delegated rounds");
239			},
240			RoundFlowState::Canceled => true,
241			RoundFlowState::Failed { .. } => true,
242			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
243				self.flow = RoundFlowState::Canceled;
244				true
245			},
246			RoundFlowState::Finished { .. } => false,
247		};
248		if ret {
249			persist_round_failure(wallet, &self.participation, self.movement_id).await
250				.context("failed to persist round failure for cancelation")?;
251		}
252		Ok(ret)
253	}
254
255	async fn try_start_attempt(
256		&mut self,
257		wallet: &Wallet,
258		attempt: &RoundAttempt,
259	) {
260		// Drop the previous attempt's stashed nonces: the new attempt
261		// regenerates cosign keys, so the old key becomes unreachable.
262		if let RoundFlowState::InteractiveOngoing {
263			state: AttemptState::AwaitingUnsignedVtxoTree { ref cosign_keys, .. },
264			..
265		} = self.flow {
266			if let Some(k) = cosign_keys.first() {
267				wallet.inner.round_secret_nonces.forget(&k.public_key());
268			}
269		}
270
271		match start_attempt(wallet, &self.participation, attempt).await {
272			Ok(state) => {
273				self.flow = RoundFlowState::InteractiveOngoing {
274					round_seq: attempt.round_seq,
275					attempt_seq: attempt.attempt_seq,
276					state: state,
277				};
278			},
279			Err(e) => {
280				self.flow = RoundFlowState::Failed {
281					error: format!("{:#}", e),
282				};
283			},
284		}
285	}
286
287	/// Processes the given event and returns true if some update was made to the state
288	pub async fn process_event(
289		&mut self,
290		wallet: &Wallet,
291		event: &RoundEvent,
292	) -> bool {
293		let _: Infallible = match self.flow {
294			RoundFlowState::InteractivePending => {
295				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
296					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
297					self.try_start_attempt(wallet, e).await;
298					return true;
299				} else {
300					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
301						event.kind(), event.round_seq(), event.attempt_seq(),
302					);
303					return false;
304				}
305			},
306			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
307				// here we catch the cases where we're in a wrong flow
308
309				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
310					warn!("Round {} failed by server", round_seq);
311					self.flow = RoundFlowState::Failed {
312						error: format!("round {} failed by server", round_seq),
313					};
314					return true;
315				}
316
317				if event.round_seq() > round_seq {
318					// new round started, we don't support multiple parallel rounds,
319					// this means we failed
320					self.flow = RoundFlowState::Failed {
321						error: format!("round {} started while we were on {}",
322							event.round_seq(), round_seq,
323						),
324					};
325					return true;
326				}
327
328				if event.attempt_seq() < attempt_seq {
329					trace!("ignoring replayed message from old attempt");
330					return false;
331				}
332
333				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
334					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
335					self.try_start_attempt(wallet, e).await;
336					return true;
337				}
338				trace!("Processing event {} for round attempt {}:{} in state {}",
339					event.kind(), round_seq, attempt_seq, state.kind(),
340				);
341
342				return match progress_attempt(state, wallet, &self.participation, event).await {
343					AttemptProgressResult::NotUpdated => false,
344					AttemptProgressResult::Updated { new_state } => {
345						*state = new_state;
346						true
347					},
348					AttemptProgressResult::Failed(e) => {
349						warn!("Round failed with error: {:#}", e);
350						self.flow = RoundFlowState::Failed {
351							error: format!("{:#}", e),
352						};
353						true
354					},
355					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
356						self.new_vtxos = vtxos;
357						let funding_txid = funding_tx.compute_txid();
358						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
359						if let Some(mid) = self.movement_id {
360							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
361								warn!("Error updating the round funding txid: {:#}", e);
362							}
363						}
364						true
365					},
366				};
367			},
368			RoundFlowState::NonInteractivePending { .. }
369				| RoundFlowState::Finished { .. }
370				| RoundFlowState::Failed { .. }
371				| RoundFlowState::Canceled => return false,
372		};
373	}
374
375	/// Sync the round's status and return it
376	///
377	/// When success or failure is returned, the round state can be eliminated
378	//TODO(stevenroose) make RoundState manage its own db record
379	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
380		match self.flow {
381			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
382				Ok(RoundStatus::Confirmed {
383					funding_txid: funding_tx.compute_txid(),
384				})
385			},
386
387			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
388				Ok(RoundStatus::Pending)
389			},
390			RoundFlowState::Failed { ref error } => {
391				persist_round_failure(wallet, &self.participation, self.movement_id).await
392					.context("failed to persist round failure")?;
393				Ok(RoundStatus::Failed { error: error.clone() })
394			},
395			RoundFlowState::Canceled => {
396				persist_round_failure(wallet, &self.participation, self.movement_id).await
397					.context("failed to persist round failure")?;
398				Ok(RoundStatus::Canceled)
399			},
400
401			RoundFlowState::NonInteractivePending { unlock_hash } => {
402				match progress_delegated(wallet, &self.participation, unlock_hash).await {
403					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
404					Ok(HarkProgressResult::RoundNotFound) => {
405						self.handle_round_not_found(wallet).await
406					},
407					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
408						let funding_txid = funding_tx.compute_txid();
409						self.new_vtxos = new_vtxos;
410						self.flow = RoundFlowState::Finished {
411							funding_tx: funding_tx.clone(),
412							unlock_hash: unlock_hash,
413						};
414
415						persist_round_success(
416							wallet,
417							&self.participation,
418							self.movement_id,
419							&self.new_vtxos,
420							&funding_tx,
421						).await.context("failed to store successful round in DB!")?;
422
423						self.done = true;
424
425						Ok(RoundStatus::Confirmed { funding_txid })
426					},
427					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
428						if let Some(mid) = self.movement_id {
429							update_funding_txid(wallet, mid, funding_txid).await
430								.context("failed to update funding txid in DB")?;
431						}
432						Ok(RoundStatus::Unconfirmed { funding_txid })
433					},
434
435					//TODO(stevenroose) should we mark as failed for these cases?
436
437					Err(HarkForfeitError::Err(e)) => {
438						//TODO(stevenroose) we failed here but we might actualy be able to
439						// succeed if we retry. should we implement some kind of limited
440						// retry after which we mark as failed?
441						Err(e.context("error progressing delegated round"))
442					},
443					Err(HarkForfeitError::SentForfeits(e)) => {
444						self.sent_forfeit_sigs = true;
445						Err(e.context("error progressing delegated round \
446							after sending forfeit tx signatures"))
447					},
448				}
449			},
450			// interactive part finished, but didn't forfeit yet
451			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
452				let funding_txid = funding_tx.compute_txid();
453				let confirmed = check_funding_tx_confirmations(
454					wallet, funding_txid, &funding_tx,
455				).await.context("error checking funding tx confirmations")?;
456				if !confirmed {
457					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
458					return Ok(RoundStatus::Unconfirmed { funding_txid });
459				}
460
461				match hark_vtxo_swap(
462					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
463				).await {
464					Ok(()) => {
465						persist_round_success(
466							wallet,
467							&self.participation,
468							self.movement_id,
469							&self.new_vtxos,
470							&funding_tx,
471						).await.context("failed to store successful round in DB!")?;
472
473						self.done = true;
474
475						Ok(RoundStatus::Confirmed { funding_txid })
476					},
477					Err(HarkForfeitError::Err(e)) => {
478						Err(e.context("error forfeiting VTXOs after round"))
479					},
480					Err(HarkForfeitError::SentForfeits(e)) => {
481						self.sent_forfeit_sigs = true;
482						Err(e.context("error after having signed and sent \
483							forfeit signatures to server"))
484					},
485				}
486			},
487		}
488	}
489
490	/// Once we know the signed round funding tx, this returns the output VTXOs
491	/// for this round.
492	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
493		if self.new_vtxos.is_empty() {
494			None
495		} else {
496			Some(&self.new_vtxos)
497		}
498	}
499
500	/// Returns the input VTXOs that are locked in this round, but only
501	/// if no output VTXOs were issued yet.
502	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
503		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
504		match self.flow {
505			RoundFlowState::NonInteractivePending { .. }
506				| RoundFlowState::InteractivePending
507				| RoundFlowState::InteractiveOngoing { .. }
508			=> {
509				&self.participation.inputs
510			},
511			RoundFlowState::Finished { .. } => if self.done {
512				// inputs already unlocked
513				&[]
514			} else {
515				&self.participation.inputs
516			},
517			RoundFlowState::Failed { .. }
518				| RoundFlowState::Canceled
519			=> {
520				// inputs already unlocked
521				&[]
522			},
523		}
524	}
525
526	/// The balance pending in this round
527	///
528	/// This becomes zero once the new round VTXOs are unlocked.
529	pub fn pending_balance(&self) -> Amount {
530		if self.done {
531			return Amount::ZERO;
532		}
533
534		match self.flow {
535			RoundFlowState::NonInteractivePending { .. }
536				| RoundFlowState::InteractivePending
537				| RoundFlowState::InteractiveOngoing { .. }
538				| RoundFlowState::Finished { .. }
539			=> {
540				self.participation.outputs.iter().map(|o| o.amount).sum()
541			},
542			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
543				Amount::ZERO
544			},
545		}
546	}
547
548	/// Handle the case where the server reports our round participation as not found.
549	///
550	/// If we sent forfeit signatures (which only happens after the round was
551	/// confirmed), this is adversarial — trigger unilateral exit.
552	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
553	/// and verify with the server that they're still considered spendable.
554	async fn handle_round_not_found(
555		&mut self,
556		wallet: &Wallet,
557	) -> anyhow::Result<RoundStatus> {
558		info!("Server reports round participation not found (no forfeits sent)");
559		self.flow = RoundFlowState::Failed {
560			error: "server reports round participation not found".into(),
561		};
562		persist_round_failure(wallet, &self.participation, self.movement_id).await
563			.context("failed to persist round failure")?;
564
565		Ok(RoundStatus::Failed {
566			error: "server reports round participation not found".into(),
567		})
568	}
569}
570
571/// The state of the process flow of a round
572///
573/// This tracks the progress of the interactive part of the round, from
574/// waiting to start until finishing either succesfully or with a failure.
575pub enum RoundFlowState {
576	/// We don't do flow and we just wait for the round to finish
577	NonInteractivePending {
578		unlock_hash: UnlockHash,
579	},
580
581	/// Waiting for round to happen
582	InteractivePending,
583	/// Interactive part ongoing
584	InteractiveOngoing {
585		round_seq: RoundSeq,
586		attempt_seq: usize,
587		state: AttemptState,
588	},
589
590	/// Interactive part finished, waiting for confirmation
591	Finished {
592		funding_tx: Transaction,
593		unlock_hash: UnlockHash,
594	},
595
596	/// Failed during round
597	Failed {
598		error: String,
599	},
600
601	/// User canceled round
602	Canceled,
603}
604
605/// The state of a single round attempt
606///
607/// For each attempt that we participate in, we keep the state of our concrete
608/// participation.
609pub enum AttemptState {
610	AwaitingAttempt,
611	AwaitingUnsignedVtxoTree {
612		cosign_keys: Vec<Keypair>,
613		unlock_hash: UnlockHash,
614	},
615	AwaitingFinishedRound {
616		unsigned_round_tx: Transaction,
617		vtxos_spec: VtxoTreeSpec,
618		unlock_hash: UnlockHash,
619	},
620}
621
622impl AttemptState {
623	/// The state kind represented as a string
624	fn kind(&self) -> &'static str {
625		match self {
626			Self::AwaitingAttempt => "AwaitingAttempt",
627			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
628			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
629		}
630	}
631}
632
633/// Result from trying to progress an ongoing round attempt
634enum AttemptProgressResult {
635	Finished {
636		funding_tx: Transaction,
637		vtxos: Vec<Vtxo<Full>>,
638		unlock_hash: UnlockHash,
639	},
640	Failed(anyhow::Error),
641	/// When the state changes, this variant is returned
642	///
643	/// If during the processing, we have signed any forfeit txs and tried
644	/// sending them to the server, the [UnconfirmedRound] instance is returned
645	/// so that it can be stored in the state.
646	Updated {
647		new_state: AttemptState,
648	},
649	NotUpdated,
650}
651
652/// Participate in the new round attempt by submitting our round participation
653async fn start_attempt(
654	wallet: &Wallet,
655	participation: &RoundParticipation,
656	event: &RoundAttempt,
657) -> anyhow::Result<AttemptState> {
658	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
659
660	// Assign cosign pubkeys to the payment requests.
661	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
662		.take(participation.outputs.len())
663		.collect::<Vec<_>>();
664
665	// Prepare round participation info.
666	// For each of our requested vtxo output, we need a set of public and secret nonces.
667	let cosign_nonces = cosign_keys.iter()
668		.map(|key| {
669			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
670			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
671			for _ in 0..ark_info.nb_round_nonces {
672				let (s, p) = musig::nonce_pair(key);
673				secs.push(s);
674				pubs.push(p);
675			}
676			(secs, pubs)
677		})
678		.take(participation.outputs.len())
679		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
680
681
682	// The round has now started. We can submit our payment.
683	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
684		participation.inputs.len(), participation.outputs.len(),
685	);
686
687	// Build signed requests with mailbox IDs
688	let unblinded_mailbox_id = wallet.mailbox_identifier();
689	let signed_reqs = participation.outputs.iter()
690		.zip(cosign_keys.iter())
691		.zip(cosign_nonces.iter())
692		.map(|((req, cosign_key), (_sec, pub_nonces))| {
693			SignedVtxoRequest {
694				vtxo: req.clone(),
695				cosign_pubkey: cosign_key.public_key(),
696				nonces: pub_nonces.clone(),
697			}
698		})
699		.collect::<Vec<_>>();
700
701	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
702	for vtxo in participation.inputs.iter() {
703		let keypair = wallet.get_vtxo_key(vtxo).await
704			.map_err(HarkForfeitError::Err)?;
705		input_vtxos.push(protos::InputVtxo {
706			vtxo_id: vtxo.id().to_bytes().to_vec(),
707			attestation: {
708				let attestation = RoundAttemptAttestation::new(
709					event.challenge, vtxo.id(), &signed_reqs, &keypair,
710				);
711				attestation.serialize()
712			},
713		});
714	}
715
716	// Register VTXO transaction chains with server before round participation
717	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
718		.map_err(HarkForfeitError::Err)?;
719
720	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
721		input_vtxos: input_vtxos,
722		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
723		#[allow(deprecated)]
724		offboard_requests: vec![],
725		unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
726	}).await.context("Ark server refused our payment submission")?;
727	let unlock_hash = UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?;
728
729	// Stash nonces in memory only. Empty `cosign_keys` means no VTXO
730	// outputs (offboard-only) — nothing to stash.
731	if let Some(k) = cosign_keys.first() {
732		wallet.inner.round_secret_nonces.stash(
733			k.public_key(),
734			cosign_nonces.into_iter().map(|(sec, _pub)| sec).collect(),
735		);
736	}
737
738	Ok(AttemptState::AwaitingUnsignedVtxoTree { unlock_hash, cosign_keys })
739}
740
741/// just an internal type; need Error trait to work with anyhow
742#[derive(Debug, thiserror::Error)]
743enum HarkForfeitError {
744	/// An error happened after we sent forfeit signatures to the server
745	#[error("error after forfeits were sent")]
746	SentForfeits(#[source] anyhow::Error),
747	/// An error happened before we sent forfeit signatures to the server
748	#[error("error before forfeits were sent")]
749	Err(#[source] anyhow::Error),
750}
751
752async fn hark_cosign_leaf(
753	wallet: &Wallet,
754	srv: &mut ServerConnection,
755	funding_tx: &Transaction,
756	vtxo: &mut Vtxo<Full>,
757) -> anyhow::Result<()> {
758	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
759		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
760		.with_context(|| format!(
761			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
762		))?.1;
763	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
764	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
765		protos::LeafVtxoCosignRequest::from(cosign_req),
766	).await
767		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
768		.into_inner().try_into()
769		.context("bad leaf vtxo cosign response")?;
770	ensure!(ctx.finalize(vtxo, cosign_resp),
771		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
772	);
773
774	Ok(())
775}
776
777/// Finish the hArk VTXO swap protocol
778///
779/// This includes:
780/// - requesting cosignature of the locked hArk leaves
781/// - sending forfeit txs to the server in return for the unlock preimage
782///
783/// NB all the actions in this function are idempotent, meaning that the server
784/// allows them to be done multiple times. this means that if this function calls
785/// fails, it's safe to just call it again at a later time
786async fn hark_vtxo_swap(
787	wallet: &Wallet,
788	participation: &RoundParticipation,
789	output_vtxos: &mut [Vtxo<Full>],
790	funding_tx: &Transaction,
791	unlock_hash: UnlockHash,
792) -> Result<(), HarkForfeitError> {
793	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
794
795	// before we start make sure the server has our input vtxo signatures
796	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
797		.context("couldn't send our input vtxo transactions to server")
798		.map_err(HarkForfeitError::Err)?;
799
800	// first get the leaves signed
801	for vtxo in output_vtxos.iter_mut() {
802		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
803			.map_err(HarkForfeitError::Err)?;
804	}
805
806	// then do the forfeit dance
807
808	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
809		unlock_hash: unlock_hash.to_byte_array().to_vec(),
810		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
811	}).await
812		.context("request forfeits nonces call failed")
813		.map_err(HarkForfeitError::Err)?
814		.into_inner().public_nonces.into_iter()
815		.map(|b| musig::PublicNonce::from_bytes(b))
816		.collect::<Result<Vec<_>, _>>()
817		.context("invalid forfeit nonces")
818		.map_err(HarkForfeitError::Err)?;
819
820	if server_nonces.len() != participation.inputs.len() {
821		return Err(HarkForfeitError::Err(anyhow!(
822			"server sent {} nonce pairs, expected {}",
823			server_nonces.len(), participation.inputs.len(),
824		)));
825	}
826
827	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
828	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
829		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
830			.ok().flatten().with_context(|| format!(
831				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
832			)).map_err(HarkForfeitError::Err)?.1;
833		forfeit_bundles.push(HashLockedForfeitBundle::new(
834			input, unlock_hash, &user_key, &nonces,
835		))
836	}
837
838	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
839		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
840	}).await
841		.context("forfeit vtxos call failed")
842		.map_err(HarkForfeitError::SentForfeits)?
843		.into_inner().unlock_preimage.as_slice().try_into()
844		.context("invalid preimage length")
845		.map_err(HarkForfeitError::SentForfeits)?;
846
847	for vtxo in output_vtxos.iter_mut() {
848		if !vtxo.provide_unlock_preimage(preimage) {
849			return Err(HarkForfeitError::SentForfeits(anyhow!(
850				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
851				preimage.as_hex(), vtxo.id(), unlock_hash,
852			)));
853		}
854
855		// then validate the vtxo works
856		vtxo.validate(&funding_tx).with_context(|| format!(
857			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
858		)).map_err(HarkForfeitError::SentForfeits)?;
859	}
860
861	Ok(())
862}
863
864fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
865	match vtxo.validate(funding_tx) {
866		Err(VtxoValidationError::GenesisTransition {
867			genesis_idx, genesis_len, transition_kind, ..
868		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
869		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
870			vtxo.serialize_hex(),
871		)),
872		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
873	}
874}
875
876fn check_round_matches_participation(
877	part: &RoundParticipation,
878	new_vtxos: &[Vtxo<Full>],
879	funding_tx: &Transaction,
880) -> anyhow::Result<()> {
881	ensure!(new_vtxos.len() == part.outputs.len(),
882		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
883	);
884
885	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
886		ensure!(vtxo.amount() == req.amount,
887			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
888		);
889		ensure!(*vtxo.policy() == req.policy,
890			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
891		);
892
893		// We accept the VTXO if only the hArk transition (last) failure happens
894		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
895	}
896
897	Ok(())
898}
899
900/// Check the confirmation status of a funding tx
901///
902/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
903/// The required number of confirmations depends on the wallet's configuration.
904///
905/// Returns false if the funding tx seems valid but not confirmed yet.
906///
907/// Returns an error if the chain source fails or if we can't submit the tx to the
908/// mempool, suggesting it might be double spent.
909async fn check_funding_tx_confirmations(
910	wallet: &Wallet,
911	funding_txid: Txid,
912	funding_tx: &Transaction,
913) -> anyhow::Result<bool> {
914	let tip = wallet.inner.chain.tip().await.context("chain source error")?;
915	let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
916	let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
917	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
918		funding_txid, tx_status, tip,
919	);
920	match tx_status {
921		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
922		TxStatus::Mempool | TxStatus::Confirmed(_) => {
923			if wallet.inner.config.round_tx_required_confirmations == 0 {
924				debug!("Accepting round funding tx without confirmations because of configuration");
925				Ok(true)
926			} else {
927				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
928				Ok(false)
929			}
930		},
931		TxStatus::NotFound => {
932			// let's try to submit it to our mempool
933			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
934			// reliably distinguish the cases of our chain source having issues and the tx
935			// actually being rejected which suggests the round was double-spent
936			if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
937				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
938					funding_txid, serialize_hex(funding_tx), e,
939				))
940			} else {
941				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
942				Ok(false)
943			}
944		},
945	}
946}
947
948enum HarkProgressResult {
949	RoundPending,
950	RoundNotFound,
951	FundingTxUnconfirmed {
952		funding_txid: Txid,
953	},
954	Ok {
955		funding_tx: Transaction,
956		new_vtxos: Vec<Vtxo<Full>>,
957	},
958}
959
960async fn progress_delegated(
961	wallet: &Wallet,
962	participation: &RoundParticipation,
963	unlock_hash: UnlockHash,
964) -> Result<HarkProgressResult, HarkForfeitError> {
965	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
966
967	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
968		unlock_hash: unlock_hash.to_byte_array().to_vec(),
969	}).await {
970		Ok(resp) => resp.into_inner(),
971		Err(err) if err.code() == tonic::Code::NotFound => {
972			return Ok(HarkProgressResult::RoundNotFound);
973		},
974		Err(err) => {
975			return Err(HarkForfeitError::Err(
976				anyhow::Error::from(err).context("error checking round participation status"),
977			));
978		},
979	};
980	let status = protos::RoundParticipationStatus::try_from(resp.status)
981		.context("unknown status from server")
982		.map_err(HarkForfeitError::Err)	?;
983
984	if status == protos::RoundParticipationStatus::RoundPartPending {
985		trace!("Hark round still pending");
986		return Ok(HarkProgressResult::RoundPending);
987	}
988
989	// Since we got here, we clearly don't think we're finished.
990	// So even if the server thinks we did the dance before, we need the
991	// cosignature on the leaf tx so we need to do the dance again.
992	// "Guilty feet have got no rhythm."
993	if status == protos::RoundParticipationStatus::RoundPartReleased {
994		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
995		warn!("Server says preimage was already released for hArk participation \
996			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
997		);
998	}
999
1000	let funding_tx_bytes = resp.round_funding_tx
1001		.context("funding txid should be provided when status is not pending")
1002		.map_err(HarkForfeitError::Err)?;
1003	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
1004		.context("invalid funding txid")
1005		.map_err(HarkForfeitError::Err)?;
1006	let funding_txid = funding_tx.compute_txid();
1007	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
1008		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
1009	);
1010
1011	// Check the confirmation status of the funding tx
1012	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
1013		Ok(true) => {},
1014		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
1015		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
1016	}
1017
1018	let mut new_vtxos = resp.output_vtxos.into_iter()
1019		.map(|v| <Vtxo<Full>>::deserialize(&v))
1020		.collect::<Result<Vec<_>, _>>()
1021		.context("invalid output VTXOs from server")
1022		.map_err(HarkForfeitError::Err)?;
1023
1024	// Check that the vtxos match our participation in the exact order
1025	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1026		.context("new VTXOs received from server don't match our participation")
1027		.map_err(HarkForfeitError::Err)?;
1028
1029	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1030		.context("error forfeiting hArk VTXOs")
1031		.map_err(HarkForfeitError::SentForfeits)?;
1032
1033	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1034}
1035
1036async fn progress_attempt(
1037	state: &mut AttemptState,
1038	wallet: &Wallet,
1039	part: &RoundParticipation,
1040	event: &RoundEvent,
1041) -> AttemptProgressResult {
1042	// we will match only the states and messages required to make progress,
1043	// all else we ignore, except an unexpected finish
1044
1045	match (state, event) {
1046
1047		(
1048			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, unlock_hash },
1049			RoundEvent::VtxoProposal(e),
1050		) => {
1051			trace!("Received VtxoProposal: {:#?}", e);
1052
1053			// Missing nonces means we restarted before signing —
1054			// abandon the attempt rather than reuse on retry.
1055			let secret_nonces = if let Some(first) = cosign_keys.first() {
1056				match wallet.inner.round_secret_nonces.take(&first.public_key()) {
1057					Some(n) => n,
1058					None => return AttemptProgressResult::Failed(anyhow!(
1059						"secret cosign nonces unavailable (likely after a restart); \
1060						 abandoning round attempt to avoid nonce reuse",
1061					)),
1062				}
1063			} else {
1064				vec![]
1065			};
1066
1067			match sign_vtxo_tree(
1068				wallet,
1069				part,
1070				&cosign_keys,
1071				secret_nonces,
1072				&e.unsigned_round_tx,
1073				&e.vtxos_spec,
1074				&e.cosign_agg_nonces,
1075			).await {
1076				Ok(()) => {
1077					AttemptProgressResult::Updated {
1078						new_state: AttemptState::AwaitingFinishedRound {
1079							unsigned_round_tx: e.unsigned_round_tx.clone(),
1080							vtxos_spec: e.vtxos_spec.clone(),
1081							unlock_hash: *unlock_hash,
1082						},
1083					}
1084				},
1085				Err(e) => {
1086					trace!("Error signing VTXO tree: {:#}", e);
1087					AttemptProgressResult::Failed(e)
1088				},
1089			}
1090		},
1091
1092		(
1093			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1094			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1095		) => {
1096			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1097				return AttemptProgressResult::Failed(anyhow!(
1098					"signed funding tx ({}) doesn't match tx received before ({})",
1099					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1100				));
1101			}
1102
1103			if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1104				warn!("Failed to broadcast signed round tx: {:#}", e);
1105			}
1106
1107			match construct_new_vtxos(
1108				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1109			).await {
1110				Ok(v) => AttemptProgressResult::Finished {
1111					funding_tx: signed_round_tx.clone(),
1112					vtxos: v,
1113					unlock_hash: *unlock_hash,
1114				},
1115				Err(e) => AttemptProgressResult::Failed(anyhow!(
1116					"failed to construct new VTXOs for round: {:#}", e,
1117				)),
1118			}
1119		},
1120
1121		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1122			AttemptProgressResult::Failed(anyhow!(
1123				"unexpectedly received a finished round while we were in state {}",
1124				state.kind(),
1125			))
1126		},
1127
1128		(state, _) => {
1129			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1130			AttemptProgressResult::NotUpdated
1131		},
1132	}
1133}
1134
1135async fn sign_vtxo_tree(
1136	wallet: &Wallet,
1137	participation: &RoundParticipation,
1138	cosign_keys: &[Keypair],
1139	secret_nonces: Vec<Vec<SecretNonce>>,
1140	unsigned_round_tx: &Transaction,
1141	vtxo_tree: &VtxoTreeSpec,
1142	cosign_agg_nonces: &[musig::AggregatedNonce],
1143) -> anyhow::Result<()> {
1144	let (mut srv, _) = wallet.require_server().await.context("server not available")?;
1145
1146	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1147
1148	// Check that the proposal contains our inputs.
1149	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1150	for vtxo_req in vtxo_tree.iter_vtxos() {
1151		if let Some(i) = my_vtxos.iter().position(|v| {
1152			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1153		}) {
1154			my_vtxos.swap_remove(i);
1155		}
1156	}
1157	if !my_vtxos.is_empty() {
1158		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1159	}
1160
1161	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1162	trace!("Sending vtxo signatures to server...");
1163	// Sequential: SecretNonce is consume-once and not Clone, so we move
1164	// one Vec<SecretNonce> into cosign_branch per output. Going parallel
1165	// would require sharing the server connection across futures, which
1166	// isn't worth the complexity for a per-output RPC.
1167	for ((req, key), sec) in participation.outputs.iter().zip(cosign_keys).zip(secret_nonces) {
1168		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1169		let part_sigs = unsigned_vtxos.cosign_branch(
1170			&cosign_agg_nonces, leaf_idx, key, sec,
1171		).context("failed to cosign branch: our request not part of tree")?;
1172
1173		info!("Sending {} partial vtxo cosign signatures for pk {}",
1174			part_sigs.len(), key.public_key(),
1175		);
1176
1177		srv.client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1178			pubkey: key.public_key().serialize().to_vec(),
1179			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1180		}).await.context("error sending vtxo signatures")?;
1181	}
1182	trace!("Done sending vtxo signatures to server");
1183
1184	Ok(())
1185}
1186
1187async fn construct_new_vtxos(
1188	participation: &RoundParticipation,
1189	unsigned_round_tx: &Transaction,
1190	vtxo_tree: &VtxoTreeSpec,
1191	vtxo_cosign_sigs: &[schnorr::Signature],
1192) -> anyhow::Result<Vec<Vtxo<Full>>> {
1193	let round_txid = unsigned_round_tx.compute_txid();
1194	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1195	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1196
1197	// Validate the vtxo tree and cosign signatures.
1198	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1199		// bad server!
1200		bail!("Received incorrect vtxo cosign signatures from server");
1201	}
1202
1203	let signed_vtxos = vtxo_tree
1204		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1205		.into_cached_tree();
1206
1207	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1208	let total_nb_expected_vtxos = expected_vtxos.len();
1209
1210	let mut new_vtxos = vec![];
1211	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1212		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1213			let vtxo = signed_vtxos.build_vtxo(idx);
1214
1215			// validate the received vtxos
1216			// This is more like a sanity check since we crafted them ourselves.
1217			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1218				.context("constructed invalid vtxo from tree")?;
1219
1220			info!("New VTXO from round: {} ({}, {})",
1221				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1222			);
1223
1224			new_vtxos.push(vtxo);
1225			expected_vtxos.swap_remove(expected_idx);
1226		}
1227	}
1228
1229	if !expected_vtxos.is_empty() {
1230		if expected_vtxos.len() == total_nb_expected_vtxos {
1231			// we must have done something wrong
1232			bail!("None of our VTXOs were present in round!");
1233		} else {
1234			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1235				expected_vtxos.len(), expected_vtxos,
1236			);
1237		}
1238	}
1239	Ok(new_vtxos)
1240}
1241
1242//TODO(stevenroose) should be made idempotent
1243async fn persist_round_success(
1244	wallet: &Wallet,
1245	participation: &RoundParticipation,
1246	movement_id: Option<MovementId>,
1247	new_vtxos: &[Vtxo<Full>],
1248	funding_tx: &Transaction,
1249) -> anyhow::Result<()> {
1250	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1251		new_vtxos.len(), movement_id,
1252	);
1253
1254	// we first try all actions that need to happen and only afterwards return errors
1255	// so that we achieve maximum success
1256
1257	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1258		.context("failed to store new VTXOs");
1259	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1260		.context("failed to mark input VTXOs as spent");
1261	let update_result = if let Some(mid) = movement_id {
1262		wallet.inner.movements.finish_movement_with_update(
1263			mid,
1264			MovementStatus::Successful,
1265			MovementUpdate::new()
1266				.produced_vtxos(new_vtxos)
1267				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1268		).await.context("failed to mark movement as finished")
1269	} else {
1270		Ok(())
1271	};
1272
1273	store_result?;
1274	spent_result?;
1275	update_result?;
1276
1277	Ok(())
1278}
1279
1280async fn persist_round_failure(
1281	wallet: &Wallet,
1282	participation: &RoundParticipation,
1283	movement_id: Option<MovementId>,
1284) -> anyhow::Result<()> {
1285	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1286	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1287	let finish_result = if let Some(movement_id) = movement_id {
1288		wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1289	} else {
1290		Ok(())
1291	};
1292	if let Err(e) = &finish_result {
1293		error!("Failed to mark movement as failed: {:#}", e);
1294	}
1295	match (unlock_result, finish_result) {
1296		(Ok(()), Ok(())) => Ok(()),
1297		(Err(e), _) => Err(e),
1298		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1299	}
1300}
1301
1302async fn update_funding_txid(
1303	wallet: &Wallet,
1304	movement_id: MovementId,
1305	funding_txid: Txid,
1306) -> anyhow::Result<()> {
1307	wallet.inner.movements.update_movement(
1308		movement_id,
1309		MovementUpdate::new()
1310			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1311	).await.context("Unable to update funding txid of round")
1312}
1313
1314/// In-memory store for MuSig2 secret cosign nonces used during round
1315/// signing. Entries are keyed by the first cosign pubkey of each round
1316/// attempt — that pubkey is freshly generated in [`start_attempt`],
1317/// uniquely identifies the attempt within a process, and is reachable
1318/// from the persisted `AttemptState::AwaitingUnsignedVtxoTree`.
1319///
1320/// Nonces never touch disk: persisting them risks signing twice with
1321/// the same nonce, which is unsafe with MuSig2.
1322#[derive(Default)]
1323pub struct RoundSecretNonces {
1324	inner: parking_lot::Mutex<HashMap<bitcoin::secp256k1::PublicKey, Vec<Vec<SecretNonce>>>>,
1325}
1326
1327impl RoundSecretNonces {
1328	pub fn new() -> Self {
1329		Self { inner: parking_lot::Mutex::new(HashMap::new()) }
1330	}
1331
1332	/// Insert nonces under the given key, replacing any previous entry.
1333	pub fn stash(
1334		&self,
1335		first_cosign_pubkey: bitcoin::secp256k1::PublicKey,
1336		nonces: Vec<Vec<SecretNonce>>,
1337	) {
1338		self.inner.lock().insert(first_cosign_pubkey, nonces);
1339	}
1340
1341	/// Remove and return the nonces stashed under the given key.
1342	/// `None` after a process restart or if the entry was never stashed.
1343	pub fn take(
1344		&self,
1345		first_cosign_pubkey: &bitcoin::secp256k1::PublicKey,
1346	) -> Option<Vec<Vec<SecretNonce>>> {
1347		self.inner.lock().remove(first_cosign_pubkey)
1348	}
1349
1350	/// Drop the entry under the given key without returning its
1351	/// contents. Use when a stashed attempt is being replaced by a new
1352	/// one and its key would otherwise be unreachable.
1353	pub fn forget(&self, first_cosign_pubkey: &bitcoin::secp256k1::PublicKey) {
1354		self.inner.lock().remove(first_cosign_pubkey);
1355	}
1356}
1357
1358impl Wallet {
1359	/// Load and lock a single given round state (by id), waiting for the lock.
1360	///
1361	/// Returns `Some(state)` if the round state is found and locked, `None`
1362	/// if it is not found after acquiring the lock.
1363	pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1364		let guard = self.inner.lock_manager.lock(
1365			&format!("{}.round.{}", self.fingerprint(), id),
1366			ROUND_LOCK_TIMEOUT,
1367		).await.with_context(|| format!(
1368			"timed out waiting for lock on round state {} (wallet {})",
1369			id, self.fingerprint(),
1370		))?;
1371
1372		if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1373			return Ok(Some(state.lock(guard)));
1374		}
1375
1376		Ok(None)
1377	}
1378
1379	/// Ask the server when the next round is scheduled to start
1380	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1381		let (mut srv, _) = self.require_server().await?;
1382		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1383		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1384	}
1385
1386	/// Start a new round participation
1387	///
1388	/// This function will store the state in the db and mark the VTXOs as locked.
1389	///
1390	/// ### Return
1391	///
1392	/// - By default, the returned state will be locked to prevent race conditions.
1393	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1394	pub async fn join_next_round(
1395		&self,
1396		participation: RoundParticipation,
1397		movement_kind: Option<RoundMovement>,
1398	) -> anyhow::Result<StoredRoundState> {
1399		let movement = if let Some(kind) = movement_kind {
1400			Some(self.inner.movements.new_guarded_movement_with_update(
1401				Subsystem::ROUND,
1402				kind.to_string(),
1403				OnDropStatus::Failed,
1404				participation.to_movement_update()?
1405			).await?)
1406		} else {
1407			None
1408		};
1409		let movement_id = movement.as_ref().map(|m| m.id());
1410		let input_vtxos = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1411		let state = RoundState::new_interactive(participation, movement_id);
1412
1413		self.lock_vtxos(&input_vtxos, movement_id.map(|m| m.into())).await
1414			.context("failed to lock input VTXOs")?;
1415
1416		match (async || {
1417			let id = self.inner.db.store_round_state(&state).await?;
1418			Ok(self.lock_wait_round_state(id).await?
1419				.context("failed to lock fresh round state")?)
1420		})().await {
1421			Ok(state) => {
1422				if let Some(mut m) = movement {
1423					m.stop();
1424				}
1425				Ok(state)
1426			},
1427			Err(e) => {
1428				self.unlock_vtxos(&input_vtxos).await
1429					.context("failed to unlock input VTXOs")?;
1430				if let Some(mut m) = movement {
1431					m.fail().await.context("failed to mark movement as failed")?;
1432				}
1433				Err(e)
1434			},
1435		}
1436	}
1437
1438	/// Join next round in delegated mode
1439	pub async fn join_next_round_delegated(
1440		&self,
1441		participation: RoundParticipation,
1442		movement_kind: Option<RoundMovement>,
1443	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1444		let movement = if let Some(kind) = movement_kind {
1445			Some(self.inner.movements.new_guarded_movement_with_update(
1446				Subsystem::ROUND,
1447				kind.to_string(),
1448				OnDropStatus::Failed,
1449				participation.to_movement_update()?,
1450			).await?)
1451		} else {
1452			None
1453		};
1454		let movement_id = movement.as_ref().map(|m| m.id());
1455
1456		let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1457		self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1458			.context("error locking input VTXOs")?;
1459
1460		match self.join_next_round_delegated_inner(participation, movement_id).await {
1461			Ok(state) => {
1462				if let Some(mut m) = movement {
1463					m.stop();
1464				}
1465				Ok(state)
1466			},
1467			Err(e) => {
1468				self.unlock_vtxos(&input_ids).await
1469					.context("error unlocking input VTXOs")?;
1470				if let Some(mut m) = movement {
1471					m.fail().await.context("error marking movement as failed")?;
1472				}
1473				Err(e)
1474			},
1475		}
1476	}
1477
1478	async fn join_next_round_delegated_inner(
1479		&self,
1480		participation: RoundParticipation,
1481		movement_id: Option<MovementId>,
1482	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1483		let (mut srv, _) = self.require_server().await?;
1484
1485		// Get mailbox identifier for VTXO delivery
1486		let unblinded_mailbox_id = self.mailbox_identifier();
1487
1488		// Register VTXO transaction chains with server before round participation
1489		self.register_vtxo_transactions_with_server(&participation.inputs).await
1490			.context("failed to register input vtxo transactions with server")?;
1491
1492		// Generate attestations for input vtxos
1493		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1494		for vtxo in participation.inputs.iter() {
1495			let keypair = self.get_vtxo_key(vtxo).await
1496				.context("failed to get vtxo keypair")?;
1497			input_vtxos.push(protos::InputVtxo {
1498				vtxo_id: vtxo.id().to_bytes().to_vec(),
1499				attestation: {
1500					let attestation = DelegatedRoundParticipationAttestation::new(
1501						vtxo.id(), &participation.outputs, &keypair,
1502					);
1503					attestation.serialize()
1504				},
1505			});
1506		}
1507
1508		// Build proto VtxoRequests
1509		let vtxo_requests = participation.outputs.iter()
1510			.map(|req|
1511				protos::VtxoRequest {
1512					policy: req.policy.serialize(),
1513					amount: req.amount.to_sat(),
1514			})
1515			.collect::<Vec<_>>();
1516
1517		// Submit participation to server and get unlock_hash
1518		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1519			input_vtxos,
1520			vtxo_requests,
1521			unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1522		}).await.context("error submitting round participation to server")?.into_inner();
1523
1524		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1525			.context("invalid unlock hash from server")?;
1526
1527		let state = RoundState::new_delegated(participation, unlock_hash, movement_id);
1528
1529		info!("Delegated round participation submitted, it will automatically execute \
1530			when you next sync your wallet after the round happened \
1531			(and has sufficient confirmations).",
1532		);
1533
1534		let id = self.inner.db.store_round_state(&state).await?;
1535		Ok(StoredRoundState::new(id, state))
1536	}
1537
1538	/// Get all pending round states
1539	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1540		self.inner.db.get_pending_round_state_ids().await
1541	}
1542
1543	/// Get all pending round states
1544	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1545		let ids = self.inner.db.get_pending_round_state_ids().await?;
1546		let mut states = Vec::with_capacity(ids.len());
1547		for id in ids {
1548			if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1549				states.push(state);
1550			}
1551		}
1552		Ok(states)
1553	}
1554
1555	/// Balance locked in pending rounds
1556	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1557		let mut ret = Amount::ZERO;
1558		for round in self.pending_round_states().await? {
1559			ret += round.state().pending_balance();
1560		}
1561		Ok(ret)
1562	}
1563
1564	/// Returns all VTXOs that are locked in a pending round
1565	///
1566	/// This excludes all input VTXOs for which the output VTXOs have already
1567	/// been created.
1568	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1569		let mut ret = Vec::new();
1570		for round in self.pending_round_states().await? {
1571			let inputs = round.state().locked_pending_inputs();
1572			ret.reserve(inputs.len());
1573			for input in inputs {
1574				let v = self.get_vtxo_by_id(input.id()).await
1575					.context("unknown round input VTXO")?;
1576				ret.push(v);
1577			}
1578		}
1579		Ok(ret)
1580	}
1581
1582	/// Sync pending rounds that have finished but are waiting for confirmations
1583	pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1584		let states = self.pending_round_states().await?;
1585		if states.is_empty() {
1586			return Ok(HashMap::new());
1587		}
1588
1589		debug!("Syncing {} pending round states...", states.len());
1590
1591		let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1592		tokio_stream::iter(states).for_each_concurrent(10, |state| {
1593			let ret = ret.clone();
1594			async move {
1595				// not processing events here
1596				if state.state().ongoing_participation() {
1597					return;
1598				}
1599
1600				let mut state = match self.lock_wait_round_state(state.id()).await {
1601					Ok(Some(state)) => state,
1602					Ok(None) => return,
1603					Err(e) => {
1604						warn!("Error locking round state: {:#}", e);
1605						return;
1606					},
1607				};
1608
1609				let status = match state.state_mut().sync(self).await {
1610					Ok(s) => s,
1611					Err(e) => {
1612						warn!("Error syncing round: {:#}", e);
1613						return;
1614					},
1615				};
1616				trace!("Synced round #{}, status: {:?}", state.id(), status);
1617				match status {
1618					RoundStatus::Confirmed { funding_txid } => {
1619						info!("Round confirmed. Funding tx {}", funding_txid);
1620						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1621							warn!("Error removing confirmed round state from db: {:#}", e);
1622						}
1623					},
1624					RoundStatus::Unconfirmed { funding_txid } => {
1625						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1626						if let Err(e) = self.inner.db.update_round_state(&state).await {
1627							warn!("Error updating pending round state in db: {:#}", e);
1628						}
1629					},
1630					RoundStatus::Pending => {
1631						if let Err(e) = self.inner.db.update_round_state(&state).await {
1632							warn!("Error updating pending round state in db: {:#}", e);
1633						}
1634					},
1635					RoundStatus::Failed { ref error } => {
1636						error!("Round failed: {}", error);
1637						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1638							warn!("Error removing failed round state from db: {:#}", e);
1639						}
1640					},
1641					RoundStatus::Canceled => {
1642						error!("Round canceled");
1643						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1644							warn!("Error removing canceled round state from db: {:#}", e);
1645						}
1646					},
1647				}
1648				ret.lock().insert(state.id(), status);
1649			}
1650		}).await;
1651
1652		Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1653	}
1654
1655	/// Fetch last round event from server
1656	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1657		let (mut srv, _) = self.require_server().await?;
1658		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1659		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1660	}
1661
1662	async fn inner_process_event(
1663		&self,
1664		state: &mut StoredRoundState,
1665		event: Option<&RoundEvent>,
1666	) {
1667		if let Some(event) = event && state.state().ongoing_participation() {
1668			let updated = state.state_mut().process_event(self, &event).await;
1669			if updated {
1670				if let Err(e) = self.inner.db.update_round_state(&state).await {
1671					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1672				}
1673			}
1674		}
1675
1676		match state.state_mut().sync(self).await {
1677			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1678			Ok(s) if s.is_final() => {
1679				info!("Round #{} finished with result: {:?}", state.id(), s);
1680				if let Err(e) = self.inner.db.remove_round_state(&state).await {
1681					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1682				}
1683			},
1684			Ok(s) => {
1685				trace!("Round state #{} is now in state {:?}", state.id(), s);
1686				if let Err(e) = self.inner.db.update_round_state(&state).await {
1687					warn!("Error storing round state #{}: {:#}", state.id(), e);
1688				}
1689			},
1690		}
1691	}
1692
1693	/// Try to make incremental progress on all pending round states
1694	///
1695	/// If the `last_round_event` argument is not provided, it will be fetched
1696	/// from the server.
1697	pub async fn progress_pending_rounds(
1698		&self,
1699		last_round_event: Option<&RoundEvent>,
1700	) -> anyhow::Result<()> {
1701		let states = self.pending_round_states().await?;
1702		if states.is_empty() {
1703			return Ok(());
1704		}
1705
1706		info!("Processing {} rounds...", states.len());
1707
1708		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1709
1710		let has_ongoing_participation = states.iter()
1711			.any(|s| s.state().ongoing_participation());
1712		if has_ongoing_participation && last_round_event.is_none() {
1713			match self.get_last_round_event().await {
1714				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1715				Err(e) => {
1716					warn!("Error fetching round event, \
1717						failed to progress ongoing rounds: {:#}", e);
1718				},
1719			}
1720		}
1721
1722		let event = last_round_event.as_ref().map(|c| c.as_ref());
1723
1724		let futs = states.into_iter().map(async |state| {
1725			let locked = self.lock_wait_round_state(state.id()).await?;
1726			if let Some(mut locked) = locked {
1727				self.inner_process_event(&mut locked, event).await;
1728			}
1729			Ok::<_, anyhow::Error>(())
1730		});
1731
1732		futures::future::join_all(futs).await;
1733
1734		Ok(())
1735	}
1736
1737	pub async fn subscribe_round_events(&self)
1738		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1739	{
1740		let (mut srv, _) = self.require_server().await?;
1741		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1742			.into_inner().map(|m| {
1743				let m = m.context("received error on event stream")?;
1744				let e = RoundEvent::try_from(m.clone())
1745					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1746				trace!("Received round event: {}", e);
1747				Ok::<_, anyhow::Error>(e)
1748			});
1749		Ok(events)
1750	}
1751
1752	/// A blocking call that will try to perform a full round participation
1753	/// for all ongoing rounds
1754	///
1755	/// Returns only once there is no ongoing rounds anymore.
1756	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1757		let mut events = self.subscribe_round_events().await?;
1758
1759		loop {
1760			// NB: we need to load all ongoing rounds on every iteration here
1761			// because some might be finished by another call
1762			let state_ids = self.pending_round_states().await?.iter()
1763				.filter(|s| s.state().ongoing_participation())
1764				.map(|s| s.id())
1765				.collect::<Vec<_>>();
1766
1767			if state_ids.is_empty() {
1768				info!("All rounds handled");
1769				return Ok(());
1770			}
1771
1772			let event = events.next().await
1773				.context("events stream broke")?
1774				.context("error on event stream")?;
1775
1776			let futs = state_ids.into_iter().map(async |state| {
1777				let locked = self.lock_wait_round_state(state).await?;
1778				if let Some(mut locked) = locked {
1779					self.inner_process_event(&mut locked, Some(&event)).await;
1780				}
1781				Ok::<_, anyhow::Error>(())
1782			});
1783
1784			futures::future::join_all(futs).await;
1785		}
1786	}
1787
1788	/// Will cancel all pending rounds that can safely be canceled
1789	///
1790	/// All rounds that have not started yet can safely be canceled,
1791	/// as well as rounds where we have not yet signed any forfeit txs.
1792	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1793		// initial load to get all pending round states ids
1794		let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1795
1796		let futures = state_ids.into_iter().map(|state_id| {
1797			async move {
1798				// wait for lock and load again to ensure most recent state
1799				let mut state = match self.lock_wait_round_state(state_id).await {
1800					Ok(Some(s)) => s,
1801					Ok(None) => return,
1802					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1803				};
1804
1805				match state.state_mut().try_cancel(self).await {
1806					Ok(true) => {
1807						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1808							warn!("Error removing canceled round state from db: {:#}", e);
1809						}
1810					},
1811					Ok(false) => {},
1812					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1813				}
1814			}
1815		});
1816
1817		join_all(futures).await;
1818
1819		Ok(())
1820	}
1821
1822	/// Try to cancel the given round
1823	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1824		let mut state = self.lock_wait_round_state(id).await?
1825			.context("round state not found")?;
1826
1827		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1828			self.inner.db.remove_round_state(&state).await
1829				.context("error removing canceled round state from db")?;
1830		} else {
1831			bail!("failed to cancel round");
1832		}
1833
1834		Ok(())
1835	}
1836
1837	/// Participate in a round
1838	///
1839	/// This function will start a new round participation and block until
1840	/// the round is finished.
1841	/// After this method returns the round state will be kept active until
1842	/// the round tx fully confirms.
1843	pub(crate) async fn participate_round(
1844		&self,
1845		participation: RoundParticipation,
1846		movement_kind: Option<RoundMovement>,
1847	) -> anyhow::Result<RoundStatus> {
1848		let mut state = self.join_next_round(participation, movement_kind).await?;
1849
1850		info!("Waiting for a round start...");
1851		let mut events = self.subscribe_round_events().await?;
1852
1853		loop {
1854			if !state.state().ongoing_participation() {
1855				let status = state.state_mut().sync(self).await?;
1856				match status {
1857					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1858					RoundStatus::Canceled => bail!("round canceled"),
1859					status => return Ok(status),
1860				}
1861			}
1862
1863			let event = events.next().await
1864				.context("events stream broke")?
1865				.context("error on event stream")?;
1866			if state.state_mut().process_event(self, &event).await {
1867				self.inner.db.update_round_state(&state).await?;
1868			}
1869		}
1870	}
1871}
1872
1873#[cfg(test)]
1874mod test {
1875	use super::*;
1876
1877	use bitcoin::secp256k1::Secp256k1;
1878
1879	fn pubkey() -> bitcoin::secp256k1::PublicKey {
1880		let secp = Secp256k1::new();
1881		Keypair::new(&secp, &mut rand::thread_rng()).public_key()
1882	}
1883
1884	fn nonces() -> Vec<Vec<SecretNonce>> {
1885		let secp = Secp256k1::new();
1886		let key = Keypair::new(&secp, &mut rand::thread_rng());
1887		// Shape mirrors what start_attempt produces: outer Vec is one
1888		// entry per cosign keypair, inner Vec is the tree-depth set of
1889		// pre-generated nonces.
1890		vec![vec![musig::nonce_pair(&key).0, musig::nonce_pair(&key).0]]
1891	}
1892
1893	#[test]
1894	fn stash_and_take() {
1895		let store = RoundSecretNonces::new();
1896		let k = pubkey();
1897		store.stash(k, nonces());
1898
1899		assert!(store.take(&k).is_some());
1900	}
1901
1902	#[test]
1903	fn cannot_take_twice() {
1904		let store = RoundSecretNonces::new();
1905		let k = pubkey();
1906		store.stash(k, nonces());
1907
1908		assert!(store.take(&k).is_some());
1909		assert!(store.take(&k).is_none());
1910	}
1911
1912	#[test]
1913	fn cannot_take_after_forget() {
1914		let store = RoundSecretNonces::new();
1915		let k = pubkey();
1916		store.stash(k, nonces());
1917		store.forget(&k);
1918
1919		assert!(store.take(&k).is_none());
1920	}
1921
1922	#[test]
1923	fn stash_overrides_stash() {
1924		let secp = Secp256k1::new();
1925		let key = Keypair::new(&secp, &mut rand::thread_rng());
1926		let nonces_1 = vec![vec![musig::nonce_pair(&key).0]];
1927		let nonces_2 = vec![];
1928
1929		let store = RoundSecretNonces::new();
1930		store.stash(key.public_key(), nonces_1);
1931		store.stash(key.public_key(), nonces_2);
1932
1933		let taken = store.take(&key.public_key()).expect("nonces present");
1934		assert_eq!(taken.len(), 0);
1935	}
1936}