Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::{join_all, try_join_all};
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::{SECP, Wallet, WalletVtxo};
37use crate::movement::{MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
40
41/// How long [`Wallet::lock_wait_round_state`] waits for a contended
42/// round lock before giving up. Long enough to outlast a normal round.
43const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
44use crate::subsystem::{RoundMovement, Subsystem};
45
46
47/// The type string for the hArk leaf transition
48const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
49
50/// Struct to communicate your specific participation for an Ark round.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct RoundParticipation {
53	#[serde(with = "ark::encode::serde::vec")]
54	pub inputs: Vec<Vtxo<Full>>,
55	/// The output VTXOs that we request in the round,
56	/// including change
57	pub outputs: Vec<VtxoRequest>,
58	/// Optional mailbox identifier for round completion notification
59	#[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
60	pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
61}
62
63impl RoundParticipation {
64	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
65		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
66		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
67		let fee = input_amount - output_amount;
68		Ok(MovementUpdate::new()
69			.consumed_vtxos(&self.inputs)
70			.intended_balance(SignedAmount::ZERO)
71			.effective_balance( - fee.to_signed()?)
72			.fee(fee)
73		)
74	}
75}
76
77#[derive(Debug, Clone)]
78pub enum RoundStatus {
79	/// The round was successful and is fully confirmed
80	Confirmed {
81		funding_txid: Txid,
82	},
83	/// Round successful but not fully confirmed
84	Unconfirmed {
85		funding_txid: Txid,
86	},
87	/// Round didn't finish yet
88	Pending,
89	/// The round failed
90	Failed {
91		error: String,
92	},
93	/// User canceled the round
94	Canceled,
95}
96
97impl RoundStatus {
98	/// Whether this is the final state and it won't change anymore
99	pub fn is_final(&self) -> bool {
100		match self {
101			Self::Confirmed { .. } => true,
102			Self::Unconfirmed { .. } => false,
103			Self::Pending => false,
104			Self::Failed { .. } => true,
105			Self::Canceled => true,
106		}
107	}
108
109	/// Whether it looks like the round succeeded
110	pub fn is_success(&self) -> bool {
111		match self {
112			Self::Confirmed { .. } => true,
113			Self::Unconfirmed { .. } => true,
114			Self::Pending => false,
115			Self::Failed { .. } => false,
116			Self::Canceled => false,
117		}
118	}
119}
120
121/// State of the progress of a round participation
122///
123/// An instance of this struct is kept all the way from the intention of joining
124/// the next round, until either the round fully confirms or it fails and we are
125/// sure it won't have any effect on our wallet.
126///
127/// As soon as we have signed forfeit txs for the round, we keep track of this
128/// round attempt until we see another attempt we participated in confirm or
129/// we gain confidence that the failed attempt will never confirm.
130//
131//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
132// to have better control. this way we can touch db before we sent forfeit sigs
133pub struct RoundState {
134	/// Round is fully done
135	pub(crate) done: bool,
136
137	/// Our participation in this round
138	pub(crate) participation: RoundParticipation,
139
140	/// The flow of the round in case it is still ongoing with the server
141	pub(crate) flow: RoundFlowState,
142
143	/// The new output vtxos of this round participation
144	///
145	/// After we finish the interactive part, we fill this with the uncompleted
146	/// VTXOs which we then try to complete with the unlock preimage.
147	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
148
149	/// Whether we sent our forfeit signatures to the server
150	///
151	/// If we did this and the server refused to reveal our new VTXOs,
152	/// we will be forced to exit.
153	//TODO(stevenroose) implement exit when this is true and we can't make progress
154	// probably based on the input vtxos becoming close to expiry
155	pub(crate) sent_forfeit_sigs: bool,
156
157	/// The ID of the [Movement] associated with this round
158	pub(crate) movement_id: Option<MovementId>,
159}
160
161impl RoundState {
162	fn new_interactive(
163		participation: RoundParticipation,
164		movement_id: Option<MovementId>,
165	) -> Self {
166		Self {
167			participation,
168			movement_id,
169			flow: RoundFlowState::InteractivePending,
170			new_vtxos: Vec::new(),
171			sent_forfeit_sigs: false,
172			done: false,
173		}
174	}
175
176	fn new_non_interactive(
177		participation: RoundParticipation,
178		unlock_hash: UnlockHash,
179		movement_id: Option<MovementId>,
180	) -> Self {
181		Self {
182			participation,
183			movement_id,
184			flow: RoundFlowState::NonInteractivePending { unlock_hash },
185			new_vtxos: Vec::new(),
186			sent_forfeit_sigs: false,
187			done: false,
188		}
189	}
190
191	/// Our participation in this round
192	pub fn participation(&self) -> &RoundParticipation {
193		&self.participation
194	}
195
196	/// the unlock hash if already known
197	pub fn unlock_hash(&self) -> Option<UnlockHash> {
198		match self.flow {
199			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
200			RoundFlowState::InteractivePending => None,
201			RoundFlowState::InteractiveOngoing { .. } => None,
202			RoundFlowState::Failed { .. } => None,
203			RoundFlowState::Canceled => None,
204			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
205		}
206	}
207
208	pub fn funding_tx(&self) -> Option<&Transaction> {
209		match self.flow {
210			RoundFlowState::NonInteractivePending { .. } => None,
211			RoundFlowState::InteractivePending => None,
212			RoundFlowState::InteractiveOngoing { .. } => None,
213			RoundFlowState::Failed { .. } => None,
214			RoundFlowState::Canceled => None,
215			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
216		}
217	}
218
219	/// Whether the interactive part of the round is still ongoing
220	pub fn ongoing_participation(&self) -> bool {
221		match self.flow {
222			RoundFlowState::NonInteractivePending { .. } => false,
223			RoundFlowState::InteractivePending => true,
224			RoundFlowState::InteractiveOngoing { .. } => true,
225			RoundFlowState::Failed { .. } => false,
226			RoundFlowState::Canceled => false,
227			RoundFlowState::Finished { .. } => false,
228		}
229	}
230
231	/// Tries to cancel the round and returns whether it was succesfully canceled
232	/// or if it was already canceled or failed
233	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
234		let ret = match self.flow {
235			RoundFlowState::NonInteractivePending { .. } => {
236				//TODO(stevenroose) we have to cancel with server
237				bail!("it is currently not yet possible to cancel pending delegated rounds");
238			},
239			RoundFlowState::Canceled => true,
240			RoundFlowState::Failed { .. } => true,
241			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
242				self.flow = RoundFlowState::Canceled;
243				true
244			},
245			RoundFlowState::Finished { .. } => false,
246		};
247		if ret {
248			persist_round_failure(wallet, &self.participation, self.movement_id).await
249				.context("failed to persist round failure for cancelation")?;
250		}
251		Ok(ret)
252	}
253
254	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
255		match start_attempt(wallet, &self.participation, attempt).await {
256			Ok(state) => {
257				self.flow = RoundFlowState::InteractiveOngoing {
258					round_seq: attempt.round_seq,
259					attempt_seq: attempt.attempt_seq,
260					state: state,
261				};
262			},
263			Err(e) => {
264				self.flow = RoundFlowState::Failed {
265					error: format!("{:#}", e),
266				};
267			},
268		}
269	}
270
271	/// Processes the given event and returns true if some update was made to the state
272	pub async fn process_event(
273		&mut self,
274		wallet: &Wallet,
275		event: &RoundEvent,
276	) -> bool {
277		let _: Infallible = match self.flow {
278			RoundFlowState::InteractivePending => {
279				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
280					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
281					self.try_start_attempt(wallet, e).await;
282					return true;
283				} else {
284					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
285						event.kind(), event.round_seq(), event.attempt_seq(),
286					);
287					return false;
288				}
289			},
290			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
291				// here we catch the cases where we're in a wrong flow
292
293				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
294					warn!("Round {} failed by server", round_seq);
295					self.flow = RoundFlowState::Failed {
296						error: format!("round {} failed by server", round_seq),
297					};
298					return true;
299				}
300
301				if event.round_seq() > round_seq {
302					// new round started, we don't support multiple parallel rounds,
303					// this means we failed
304					self.flow = RoundFlowState::Failed {
305						error: format!("round {} started while we were on {}",
306							event.round_seq(), round_seq,
307						),
308					};
309					return true;
310				}
311
312				if event.attempt_seq() < attempt_seq {
313					trace!("ignoring replayed message from old attempt");
314					return false;
315				}
316
317				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
318					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
319					self.try_start_attempt(wallet, e).await;
320					return true;
321				}
322				trace!("Processing event {} for round attempt {}:{} in state {}",
323					event.kind(), round_seq, attempt_seq, state.kind(),
324				);
325
326				return match progress_attempt(state, wallet, &self.participation, event).await {
327					AttemptProgressResult::NotUpdated => false,
328					AttemptProgressResult::Updated { new_state } => {
329						*state = new_state;
330						true
331					},
332					AttemptProgressResult::Failed(e) => {
333						warn!("Round failed with error: {:#}", e);
334						self.flow = RoundFlowState::Failed {
335							error: format!("{:#}", e),
336						};
337						true
338					},
339					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
340						self.new_vtxos = vtxos;
341						let funding_txid = funding_tx.compute_txid();
342						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
343						if let Some(mid) = self.movement_id {
344							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
345								warn!("Error updating the round funding txid: {:#}", e);
346							}
347						}
348						true
349					},
350				};
351			},
352			RoundFlowState::NonInteractivePending { .. }
353				| RoundFlowState::Finished { .. }
354				| RoundFlowState::Failed { .. }
355				| RoundFlowState::Canceled => return false,
356		};
357	}
358
359	/// Sync the round's status and return it
360	///
361	/// When success or failure is returned, the round state can be eliminated
362	//TODO(stevenroose) make RoundState manage its own db record
363	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
364		match self.flow {
365			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
366				Ok(RoundStatus::Confirmed {
367					funding_txid: funding_tx.compute_txid(),
368				})
369			},
370
371			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
372				Ok(RoundStatus::Pending)
373			},
374			RoundFlowState::Failed { ref error } => {
375				persist_round_failure(wallet, &self.participation, self.movement_id).await
376					.context("failed to persist round failure")?;
377				Ok(RoundStatus::Failed { error: error.clone() })
378			},
379			RoundFlowState::Canceled => {
380				persist_round_failure(wallet, &self.participation, self.movement_id).await
381					.context("failed to persist round failure")?;
382				Ok(RoundStatus::Canceled)
383			},
384
385			RoundFlowState::NonInteractivePending { unlock_hash } => {
386				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
387					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
388					Ok(HarkProgressResult::RoundNotFound) => {
389						self.handle_round_not_found(wallet).await
390					},
391					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
392						let funding_txid = funding_tx.compute_txid();
393						self.new_vtxos = new_vtxos;
394						self.flow = RoundFlowState::Finished {
395							funding_tx: funding_tx.clone(),
396							unlock_hash: unlock_hash,
397						};
398
399						persist_round_success(
400							wallet,
401							&self.participation,
402							self.movement_id,
403							&self.new_vtxos,
404							&funding_tx,
405						).await.context("failed to store successful round in DB!")?;
406
407						self.done = true;
408
409						Ok(RoundStatus::Confirmed { funding_txid })
410					},
411					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
412						if let Some(mid) = self.movement_id {
413							update_funding_txid(wallet, mid, funding_txid).await
414								.context("failed to update funding txid in DB")?;
415						}
416						Ok(RoundStatus::Unconfirmed { funding_txid })
417					},
418
419					//TODO(stevenroose) should we mark as failed for these cases?
420
421					Err(HarkForfeitError::Err(e)) => {
422						//TODO(stevenroose) we failed here but we might actualy be able to
423						// succeed if we retry. should we implement some kind of limited
424						// retry after which we mark as failed?
425						Err(e.context("error progressing non-interactive round"))
426					},
427					Err(HarkForfeitError::SentForfeits(e)) => {
428						self.sent_forfeit_sigs = true;
429						Err(e.context("error progressing non-interactive round \
430							after sending forfeit tx signatures"))
431					},
432				}
433			},
434			// interactive part finished, but didn't forfeit yet
435			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
436				let funding_txid = funding_tx.compute_txid();
437				let confirmed = check_funding_tx_confirmations(
438					wallet, funding_txid, &funding_tx,
439				).await.context("error checking funding tx confirmations")?;
440				if !confirmed {
441					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
442					return Ok(RoundStatus::Unconfirmed { funding_txid });
443				}
444
445				match hark_vtxo_swap(
446					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
447				).await {
448					Ok(()) => {
449						persist_round_success(
450							wallet,
451							&self.participation,
452							self.movement_id,
453							&self.new_vtxos,
454							&funding_tx,
455						).await.context("failed to store successful round in DB!")?;
456
457						self.done = true;
458
459						Ok(RoundStatus::Confirmed { funding_txid })
460					},
461					Err(HarkForfeitError::Err(e)) => {
462						Err(e.context("error forfeiting VTXOs after round"))
463					},
464					Err(HarkForfeitError::SentForfeits(e)) => {
465						self.sent_forfeit_sigs = true;
466						Err(e.context("error after having signed and sent \
467							forfeit signatures to server"))
468					},
469				}
470			},
471		}
472	}
473
474	/// Once we know the signed round funding tx, this returns the output VTXOs
475	/// for this round.
476	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
477		if self.new_vtxos.is_empty() {
478			None
479		} else {
480			Some(&self.new_vtxos)
481		}
482	}
483
484	/// Returns the input VTXOs that are locked in this round, but only
485	/// if no output VTXOs were issued yet.
486	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
487		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
488		match self.flow {
489			RoundFlowState::NonInteractivePending { .. }
490				| RoundFlowState::InteractivePending
491				| RoundFlowState::InteractiveOngoing { .. }
492			=> {
493				&self.participation.inputs
494			},
495			RoundFlowState::Finished { .. } => if self.done {
496				// inputs already unlocked
497				&[]
498			} else {
499				&self.participation.inputs
500			},
501			RoundFlowState::Failed { .. }
502				| RoundFlowState::Canceled
503			=> {
504				// inputs already unlocked
505				&[]
506			},
507		}
508	}
509
510	/// The balance pending in this round
511	///
512	/// This becomes zero once the new round VTXOs are unlocked.
513	pub fn pending_balance(&self) -> Amount {
514		if self.done {
515			return Amount::ZERO;
516		}
517
518		match self.flow {
519			RoundFlowState::NonInteractivePending { .. }
520				| RoundFlowState::InteractivePending
521				| RoundFlowState::InteractiveOngoing { .. }
522				| RoundFlowState::Finished { .. }
523			=> {
524				self.participation.outputs.iter().map(|o| o.amount).sum()
525			},
526			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
527				Amount::ZERO
528			},
529		}
530	}
531
532	/// Handle the case where the server reports our round participation as not found.
533	///
534	/// If we sent forfeit signatures (which only happens after the round was
535	/// confirmed), this is adversarial — trigger unilateral exit.
536	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
537	/// and verify with the server that they're still considered spendable.
538	async fn handle_round_not_found(
539		&mut self,
540		wallet: &Wallet,
541	) -> anyhow::Result<RoundStatus> {
542		info!("Server reports round participation not found (no forfeits sent)");
543		self.flow = RoundFlowState::Failed {
544			error: "server reports round participation not found".into(),
545		};
546		persist_round_failure(wallet, &self.participation, self.movement_id).await
547			.context("failed to persist round failure")?;
548
549		Ok(RoundStatus::Failed {
550			error: "server reports round participation not found".into(),
551		})
552	}
553}
554
555/// The state of the process flow of a round
556///
557/// This tracks the progress of the interactive part of the round, from
558/// waiting to start until finishing either succesfully or with a failure.
559pub enum RoundFlowState {
560	/// We don't do flow and we just wait for the round to finish
561	NonInteractivePending {
562		unlock_hash: UnlockHash,
563	},
564
565	/// Waiting for round to happen
566	InteractivePending,
567	/// Interactive part ongoing
568	InteractiveOngoing {
569		round_seq: RoundSeq,
570		attempt_seq: usize,
571		state: AttemptState,
572	},
573
574	/// Interactive part finished, waiting for confirmation
575	Finished {
576		funding_tx: Transaction,
577		unlock_hash: UnlockHash,
578	},
579
580	/// Failed during round
581	Failed {
582		error: String,
583	},
584
585	/// User canceled round
586	Canceled,
587}
588
589/// The state of a single round attempt
590///
591/// For each attempt that we participate in, we keep the state of our concrete
592/// participation.
593pub enum AttemptState {
594	AwaitingAttempt,
595	AwaitingUnsignedVtxoTree {
596		cosign_keys: Vec<Keypair>,
597		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
598		unlock_hash: UnlockHash,
599	},
600	AwaitingFinishedRound {
601		unsigned_round_tx: Transaction,
602		vtxos_spec: VtxoTreeSpec,
603		unlock_hash: UnlockHash,
604	},
605}
606
607impl AttemptState {
608	/// The state kind represented as a string
609	fn kind(&self) -> &'static str {
610		match self {
611			Self::AwaitingAttempt => "AwaitingAttempt",
612			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
613			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
614		}
615	}
616}
617
618/// Result from trying to progress an ongoing round attempt
619enum AttemptProgressResult {
620	Finished {
621		funding_tx: Transaction,
622		vtxos: Vec<Vtxo<Full>>,
623		unlock_hash: UnlockHash,
624	},
625	Failed(anyhow::Error),
626	/// When the state changes, this variant is returned
627	///
628	/// If during the processing, we have signed any forfeit txs and tried
629	/// sending them to the server, the [UnconfirmedRound] instance is returned
630	/// so that it can be stored in the state.
631	Updated {
632		new_state: AttemptState,
633	},
634	NotUpdated,
635}
636
637/// Participate in the new round attempt by submitting our round participation
638async fn start_attempt(
639	wallet: &Wallet,
640	participation: &RoundParticipation,
641	event: &RoundAttempt,
642) -> anyhow::Result<AttemptState> {
643	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
644
645	// Assign cosign pubkeys to the payment requests.
646	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
647		.take(participation.outputs.len())
648		.collect::<Vec<_>>();
649
650	// Prepare round participation info.
651	// For each of our requested vtxo output, we need a set of public and secret nonces.
652	let cosign_nonces = cosign_keys.iter()
653		.map(|key| {
654			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
655			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
656			for _ in 0..ark_info.nb_round_nonces {
657				let (s, p) = musig::nonce_pair(key);
658				secs.push(s);
659				pubs.push(p);
660			}
661			(secs, pubs)
662		})
663		.take(participation.outputs.len())
664		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
665
666
667	// The round has now started. We can submit our payment.
668	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
669		participation.inputs.len(), participation.outputs.len(),
670	);
671
672	// Build signed requests with mailbox IDs
673	let unblinded_mailbox_id = wallet.mailbox_identifier();
674	let signed_reqs = participation.outputs.iter()
675		.zip(cosign_keys.iter())
676		.zip(cosign_nonces.iter())
677		.map(|((req, cosign_key), (_sec, pub_nonces))| {
678			SignedVtxoRequest {
679				vtxo: req.clone(),
680				cosign_pubkey: cosign_key.public_key(),
681				nonces: pub_nonces.clone(),
682			}
683		})
684		.collect::<Vec<_>>();
685
686	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
687	for vtxo in participation.inputs.iter() {
688		let keypair = wallet.get_vtxo_key(vtxo).await
689			.map_err(HarkForfeitError::Err)?;
690		input_vtxos.push(protos::InputVtxo {
691			vtxo_id: vtxo.id().to_bytes().to_vec(),
692			attestation: {
693				let attestation = RoundAttemptAttestation::new(
694					event.challenge, vtxo.id(), &signed_reqs, &keypair,
695				);
696				attestation.serialize()
697			},
698		});
699	}
700
701	// Register VTXO transaction chains with server before round participation
702	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
703		.map_err(HarkForfeitError::Err)?;
704
705	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
706		input_vtxos: input_vtxos,
707		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
708		#[allow(deprecated)]
709		offboard_requests: vec![],
710		unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
711	}).await.context("Ark server refused our payment submission")?;
712
713	Ok(AttemptState::AwaitingUnsignedVtxoTree {
714		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
715		cosign_keys: cosign_keys,
716		secret_nonces: cosign_nonces.into_iter()
717			.map(|(sec, _pub)| sec.into_iter()
718				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
719				.collect())
720			.collect(),
721	})
722}
723
724/// just an internal type; need Error trait to work with anyhow
725#[derive(Debug, thiserror::Error)]
726enum HarkForfeitError {
727	/// An error happened after we sent forfeit signatures to the server
728	#[error("error after forfeits were sent")]
729	SentForfeits(#[source] anyhow::Error),
730	/// An error happened before we sent forfeit signatures to the server
731	#[error("error before forfeits were sent")]
732	Err(#[source] anyhow::Error),
733}
734
735async fn hark_cosign_leaf(
736	wallet: &Wallet,
737	srv: &mut ServerConnection,
738	funding_tx: &Transaction,
739	vtxo: &mut Vtxo<Full>,
740) -> anyhow::Result<()> {
741	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
742		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
743		.with_context(|| format!(
744			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
745		))?.1;
746	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
747	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
748		protos::LeafVtxoCosignRequest::from(cosign_req),
749	).await
750		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
751		.into_inner().try_into()
752		.context("bad leaf vtxo cosign response")?;
753	ensure!(ctx.finalize(vtxo, cosign_resp),
754		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
755	);
756
757	Ok(())
758}
759
760/// Finish the hArk VTXO swap protocol
761///
762/// This includes:
763/// - requesting cosignature of the locked hArk leaves
764/// - sending forfeit txs to the server in return for the unlock preimage
765///
766/// NB all the actions in this function are idempotent, meaning that the server
767/// allows them to be done multiple times. this means that if this function calls
768/// fails, it's safe to just call it again at a later time
769async fn hark_vtxo_swap(
770	wallet: &Wallet,
771	participation: &RoundParticipation,
772	output_vtxos: &mut [Vtxo<Full>],
773	funding_tx: &Transaction,
774	unlock_hash: UnlockHash,
775) -> Result<(), HarkForfeitError> {
776	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
777
778	// before we start make sure the server has our input vtxo signatures
779	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
780		.context("couldn't send our input vtxo transactions to server")
781		.map_err(HarkForfeitError::Err)?;
782
783	// first get the leaves signed
784	for vtxo in output_vtxos.iter_mut() {
785		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
786			.map_err(HarkForfeitError::Err)?;
787	}
788
789	// then do the forfeit dance
790
791	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
792		unlock_hash: unlock_hash.to_byte_array().to_vec(),
793		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
794	}).await
795		.context("request forfeits nonces call failed")
796		.map_err(HarkForfeitError::Err)?
797		.into_inner().public_nonces.into_iter()
798		.map(|b| musig::PublicNonce::from_bytes(b))
799		.collect::<Result<Vec<_>, _>>()
800		.context("invalid forfeit nonces")
801		.map_err(HarkForfeitError::Err)?;
802
803	if server_nonces.len() != participation.inputs.len() {
804		return Err(HarkForfeitError::Err(anyhow!(
805			"server sent {} nonce pairs, expected {}",
806			server_nonces.len(), participation.inputs.len(),
807		)));
808	}
809
810	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
811	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
812		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
813			.ok().flatten().with_context(|| format!(
814				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
815			)).map_err(HarkForfeitError::Err)?.1;
816		forfeit_bundles.push(HashLockedForfeitBundle::new(
817			input, unlock_hash, &user_key, &nonces,
818		))
819	}
820
821	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
822		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
823	}).await
824		.context("forfeit vtxos call failed")
825		.map_err(HarkForfeitError::SentForfeits)?
826		.into_inner().unlock_preimage.as_slice().try_into()
827		.context("invalid preimage length")
828		.map_err(HarkForfeitError::SentForfeits)?;
829
830	for vtxo in output_vtxos.iter_mut() {
831		if !vtxo.provide_unlock_preimage(preimage) {
832			return Err(HarkForfeitError::SentForfeits(anyhow!(
833				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
834				preimage.as_hex(), vtxo.id(), unlock_hash,
835			)));
836		}
837
838		// then validate the vtxo works
839		vtxo.validate(&funding_tx).with_context(|| format!(
840			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
841		)).map_err(HarkForfeitError::SentForfeits)?;
842	}
843
844	Ok(())
845}
846
847fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
848	match vtxo.validate(funding_tx) {
849		Err(VtxoValidationError::GenesisTransition {
850			genesis_idx, genesis_len, transition_kind, ..
851		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
852		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
853			vtxo.serialize_hex(),
854		)),
855		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
856	}
857}
858
859fn check_round_matches_participation(
860	part: &RoundParticipation,
861	new_vtxos: &[Vtxo<Full>],
862	funding_tx: &Transaction,
863) -> anyhow::Result<()> {
864	ensure!(new_vtxos.len() == part.outputs.len(),
865		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
866	);
867
868	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
869		ensure!(vtxo.amount() == req.amount,
870			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
871		);
872		ensure!(*vtxo.policy() == req.policy,
873			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
874		);
875
876		// We accept the VTXO if only the hArk transition (last) failure happens
877		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
878	}
879
880	Ok(())
881}
882
883/// Check the confirmation status of a funding tx
884///
885/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
886/// The required number of confirmations depends on the wallet's configuration.
887///
888/// Returns false if the funding tx seems valid but not confirmed yet.
889///
890/// Returns an error if the chain source fails or if we can't submit the tx to the
891/// mempool, suggesting it might be double spent.
892async fn check_funding_tx_confirmations(
893	wallet: &Wallet,
894	funding_txid: Txid,
895	funding_tx: &Transaction,
896) -> anyhow::Result<bool> {
897	let tip = wallet.inner.chain.tip().await.context("chain source error")?;
898	let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
899	let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
900	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
901		funding_txid, tx_status, tip,
902	);
903	match tx_status {
904		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
905		TxStatus::Mempool | TxStatus::Confirmed(_) => {
906			if wallet.inner.config.round_tx_required_confirmations == 0 {
907				debug!("Accepting round funding tx without confirmations because of configuration");
908				Ok(true)
909			} else {
910				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
911				Ok(false)
912			}
913		},
914		TxStatus::NotFound => {
915			// let's try to submit it to our mempool
916			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
917			// reliably distinguish the cases of our chain source having issues and the tx
918			// actually being rejected which suggests the round was double-spent
919			if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
920				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
921					funding_txid, serialize_hex(funding_tx), e,
922				))
923			} else {
924				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
925				Ok(false)
926			}
927		},
928	}
929}
930
931enum HarkProgressResult {
932	RoundPending,
933	RoundNotFound,
934	FundingTxUnconfirmed {
935		funding_txid: Txid,
936	},
937	Ok {
938		funding_tx: Transaction,
939		new_vtxos: Vec<Vtxo<Full>>,
940	},
941}
942
943async fn progress_non_interactive(
944	wallet: &Wallet,
945	participation: &RoundParticipation,
946	unlock_hash: UnlockHash,
947) -> Result<HarkProgressResult, HarkForfeitError> {
948	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
949
950	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
951		unlock_hash: unlock_hash.to_byte_array().to_vec(),
952	}).await {
953		Ok(resp) => resp.into_inner(),
954		Err(err) if err.code() == tonic::Code::NotFound => {
955			return Ok(HarkProgressResult::RoundNotFound);
956		},
957		Err(err) => {
958			return Err(HarkForfeitError::Err(
959				anyhow::Error::from(err).context("error checking round participation status"),
960			));
961		},
962	};
963	let status = protos::RoundParticipationStatus::try_from(resp.status)
964		.context("unknown status from server")
965		.map_err(HarkForfeitError::Err)	?;
966
967	if status == protos::RoundParticipationStatus::RoundPartPending {
968		trace!("Hark round still pending");
969		return Ok(HarkProgressResult::RoundPending);
970	}
971
972	// Since we got here, we clearly don't think we're finished.
973	// So even if the server thinks we did the dance before, we need the
974	// cosignature on the leaf tx so we need to do the dance again.
975	// "Guilty feet have got no rhythm."
976	if status == protos::RoundParticipationStatus::RoundPartReleased {
977		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
978		warn!("Server says preimage was already released for hArk participation \
979			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
980		);
981	}
982
983	let funding_tx_bytes = resp.round_funding_tx
984		.context("funding txid should be provided when status is not pending")
985		.map_err(HarkForfeitError::Err)?;
986	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
987		.context("invalid funding txid")
988		.map_err(HarkForfeitError::Err)?;
989	let funding_txid = funding_tx.compute_txid();
990	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
991		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
992	);
993
994	// Check the confirmation status of the funding tx
995	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
996		Ok(true) => {},
997		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
998		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
999	}
1000
1001	let mut new_vtxos = resp.output_vtxos.into_iter()
1002		.map(|v| <Vtxo<Full>>::deserialize(&v))
1003		.collect::<Result<Vec<_>, _>>()
1004		.context("invalid output VTXOs from server")
1005		.map_err(HarkForfeitError::Err)?;
1006
1007	// Check that the vtxos match our participation in the exact order
1008	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1009		.context("new VTXOs received from server don't match our participation")
1010		.map_err(HarkForfeitError::Err)?;
1011
1012	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1013		.context("error forfeiting hArk VTXOs")
1014		.map_err(HarkForfeitError::SentForfeits)?;
1015
1016	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1017}
1018
1019async fn progress_attempt(
1020	state: &AttemptState,
1021	wallet: &Wallet,
1022	part: &RoundParticipation,
1023	event: &RoundEvent,
1024) -> AttemptProgressResult {
1025	// we will match only the states and messages required to make progress,
1026	// all else we ignore, except an unexpected finish
1027
1028	match (state, event) {
1029
1030		(
1031			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1032			RoundEvent::VtxoProposal(e),
1033		) => {
1034			trace!("Received VtxoProposal: {:#?}", e);
1035			match sign_vtxo_tree(
1036				wallet,
1037				part,
1038				&cosign_keys,
1039				&secret_nonces,
1040				&e.unsigned_round_tx,
1041				&e.vtxos_spec,
1042				&e.cosign_agg_nonces,
1043			).await {
1044				Ok(()) => {
1045					AttemptProgressResult::Updated {
1046						new_state: AttemptState::AwaitingFinishedRound {
1047							unsigned_round_tx: e.unsigned_round_tx.clone(),
1048							vtxos_spec: e.vtxos_spec.clone(),
1049							unlock_hash: *unlock_hash,
1050						},
1051					}
1052				},
1053				Err(e) => {
1054					trace!("Error signing VTXO tree: {:#}", e);
1055					AttemptProgressResult::Failed(e)
1056				},
1057			}
1058		},
1059
1060		(
1061			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1062			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1063		) => {
1064			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1065				return AttemptProgressResult::Failed(anyhow!(
1066					"signed funding tx ({}) doesn't match tx received before ({})",
1067					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1068				));
1069			}
1070
1071			if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1072				warn!("Failed to broadcast signed round tx: {:#}", e);
1073			}
1074
1075			match construct_new_vtxos(
1076				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1077			).await {
1078				Ok(v) => AttemptProgressResult::Finished {
1079					funding_tx: signed_round_tx.clone(),
1080					vtxos: v,
1081					unlock_hash: *unlock_hash,
1082				},
1083				Err(e) => AttemptProgressResult::Failed(anyhow!(
1084					"failed to construct new VTXOs for round: {:#}", e,
1085				)),
1086			}
1087		},
1088
1089		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1090			AttemptProgressResult::Failed(anyhow!(
1091				"unexpectedly received a finished round while we were in state {}",
1092				state.kind(),
1093			))
1094		},
1095
1096		(state, _) => {
1097			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1098			AttemptProgressResult::NotUpdated
1099		},
1100	}
1101}
1102
1103async fn sign_vtxo_tree(
1104	wallet: &Wallet,
1105	participation: &RoundParticipation,
1106	cosign_keys: &[Keypair],
1107	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1108	unsigned_round_tx: &Transaction,
1109	vtxo_tree: &VtxoTreeSpec,
1110	cosign_agg_nonces: &[musig::AggregatedNonce],
1111) -> anyhow::Result<()> {
1112	let (srv, _) = wallet.require_server().await.context("server not available")?;
1113
1114	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1115
1116	// Check that the proposal contains our inputs.
1117	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1118	for vtxo_req in vtxo_tree.iter_vtxos() {
1119		if let Some(i) = my_vtxos.iter().position(|v| {
1120			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1121		}) {
1122			my_vtxos.swap_remove(i);
1123		}
1124	}
1125	if !my_vtxos.is_empty() {
1126		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1127	}
1128
1129	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1130	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1131	trace!("Sending vtxo signatures to server...");
1132	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1133		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1134		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1135		let part_sigs = unsigned_vtxos.cosign_branch(
1136			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1137		).context("failed to cosign branch: our request not part of tree")?;
1138
1139		info!("Sending {} partial vtxo cosign signatures for pk {}",
1140			part_sigs.len(), key.public_key(),
1141		);
1142
1143		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1144			pubkey: key.public_key().serialize().to_vec(),
1145			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1146		}).await.context("error sending vtxo signatures")?;
1147
1148		Result::<(), anyhow::Error>::Ok(())
1149	})).await.context("error sending VTXO signatures")?;
1150	trace!("Done sending vtxo signatures to server");
1151
1152	Ok(())
1153}
1154
1155async fn construct_new_vtxos(
1156	participation: &RoundParticipation,
1157	unsigned_round_tx: &Transaction,
1158	vtxo_tree: &VtxoTreeSpec,
1159	vtxo_cosign_sigs: &[schnorr::Signature],
1160) -> anyhow::Result<Vec<Vtxo<Full>>> {
1161	let round_txid = unsigned_round_tx.compute_txid();
1162	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1163	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1164
1165	// Validate the vtxo tree and cosign signatures.
1166	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1167		// bad server!
1168		bail!("Received incorrect vtxo cosign signatures from server");
1169	}
1170
1171	let signed_vtxos = vtxo_tree
1172		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1173		.into_cached_tree();
1174
1175	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1176	let total_nb_expected_vtxos = expected_vtxos.len();
1177
1178	let mut new_vtxos = vec![];
1179	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1180		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1181			let vtxo = signed_vtxos.build_vtxo(idx);
1182
1183			// validate the received vtxos
1184			// This is more like a sanity check since we crafted them ourselves.
1185			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1186				.context("constructed invalid vtxo from tree")?;
1187
1188			info!("New VTXO from round: {} ({}, {})",
1189				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1190			);
1191
1192			new_vtxos.push(vtxo);
1193			expected_vtxos.swap_remove(expected_idx);
1194		}
1195	}
1196
1197	if !expected_vtxos.is_empty() {
1198		if expected_vtxos.len() == total_nb_expected_vtxos {
1199			// we must have done something wrong
1200			bail!("None of our VTXOs were present in round!");
1201		} else {
1202			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1203				expected_vtxos.len(), expected_vtxos,
1204			);
1205		}
1206	}
1207	Ok(new_vtxos)
1208}
1209
1210//TODO(stevenroose) should be made idempotent
1211async fn persist_round_success(
1212	wallet: &Wallet,
1213	participation: &RoundParticipation,
1214	movement_id: Option<MovementId>,
1215	new_vtxos: &[Vtxo<Full>],
1216	funding_tx: &Transaction,
1217) -> anyhow::Result<()> {
1218	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1219		new_vtxos.len(), movement_id,
1220	);
1221
1222	// we first try all actions that need to happen and only afterwards return errors
1223	// so that we achieve maximum success
1224
1225	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1226		.context("failed to store new VTXOs");
1227	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1228		.context("failed to mark input VTXOs as spent");
1229	let update_result = if let Some(mid) = movement_id {
1230		wallet.inner.movements.finish_movement_with_update(
1231			mid,
1232			MovementStatus::Successful,
1233			MovementUpdate::new()
1234				.produced_vtxos(new_vtxos)
1235				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1236		).await.context("failed to mark movement as finished")
1237	} else {
1238		Ok(())
1239	};
1240
1241	store_result?;
1242	spent_result?;
1243	update_result?;
1244
1245	Ok(())
1246}
1247
1248async fn persist_round_failure(
1249	wallet: &Wallet,
1250	participation: &RoundParticipation,
1251	movement_id: Option<MovementId>,
1252) -> anyhow::Result<()> {
1253	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1254	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1255	let finish_result = if let Some(movement_id) = movement_id {
1256		wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1257	} else {
1258		Ok(())
1259	};
1260	if let Err(e) = &finish_result {
1261		error!("Failed to mark movement as failed: {:#}", e);
1262	}
1263	match (unlock_result, finish_result) {
1264		(Ok(()), Ok(())) => Ok(()),
1265		(Err(e), _) => Err(e),
1266		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1267	}
1268}
1269
1270async fn update_funding_txid(
1271	wallet: &Wallet,
1272	movement_id: MovementId,
1273	funding_txid: Txid,
1274) -> anyhow::Result<()> {
1275	wallet.inner.movements.update_movement(
1276		movement_id,
1277		MovementUpdate::new()
1278			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1279	).await.context("Unable to update funding txid of round")
1280}
1281
1282impl Wallet {
1283	/// Load and lock a single given round state (by id), waiting for the lock.
1284	///
1285	/// Returns `Some(state)` if the round state is found and locked, `None`
1286	/// if it is not found after acquiring the lock.
1287	pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1288		let guard = self.inner.lock_manager.lock(
1289			&format!("{}.round.{}", self.fingerprint(), id),
1290			ROUND_LOCK_TIMEOUT,
1291		).await.with_context(|| format!(
1292			"timed out waiting for lock on round state {} (wallet {})",
1293			id, self.fingerprint(),
1294		))?;
1295
1296		if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1297			return Ok(Some(state.lock(guard)));
1298		}
1299
1300		Ok(None)
1301	}
1302
1303	/// Ask the server when the next round is scheduled to start
1304	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1305		let (mut srv, _) = self.require_server().await?;
1306		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1307		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1308	}
1309
1310	/// Start a new round participation
1311	///
1312	/// This function will store the state in the db and mark the VTXOs as locked.
1313	///
1314	/// ### Return
1315	///
1316	/// - By default, the returned state will be locked to prevent race conditions.
1317	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1318	pub async fn join_next_round(
1319		&self,
1320		participation: RoundParticipation,
1321		movement_kind: Option<RoundMovement>,
1322	) -> anyhow::Result<StoredRoundState> {
1323		let movement_id = if let Some(kind) = movement_kind {
1324			Some(self.inner.movements.new_movement_with_update(
1325				Subsystem::ROUND,
1326				kind.to_string(),
1327				participation.to_movement_update()?
1328			).await?)
1329		} else {
1330			None
1331		};
1332		let state = RoundState::new_interactive(participation, movement_id);
1333
1334		let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1335		let state = self.lock_wait_round_state(id).await?
1336			.context("failed to lock fresh round state")?;
1337
1338		Ok(state)
1339	}
1340
1341	/// Join next round in non-interactive or delegated mode
1342	pub async fn join_next_round_delegated(
1343		&self,
1344		participation: RoundParticipation,
1345		movement_kind: Option<RoundMovement>,
1346	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1347		let (mut srv, _) = self.require_server().await?;
1348
1349		let movement_id = if let Some(kind) = movement_kind {
1350			Some(self.inner.movements.new_movement_with_update(
1351				Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1352			).await?)
1353		} else {
1354			None
1355		};
1356
1357		// Get mailbox identifier for VTXO delivery
1358		let unblinded_mailbox_id = self.mailbox_identifier();
1359
1360		// Register VTXO transaction chains with server before round participation
1361		self.register_vtxo_transactions_with_server(&participation.inputs).await
1362			.context("failed to register input vtxo transactions with server")?;
1363
1364		// Generate attestations for input vtxos
1365		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1366		for vtxo in participation.inputs.iter() {
1367			let keypair = self.get_vtxo_key(vtxo).await
1368				.context("failed to get vtxo keypair")?;
1369			input_vtxos.push(protos::InputVtxo {
1370				vtxo_id: vtxo.id().to_bytes().to_vec(),
1371				attestation: {
1372					let attestation = DelegatedRoundParticipationAttestation::new(
1373						vtxo.id(), &participation.outputs, &keypair,
1374					);
1375					attestation.serialize()
1376				},
1377			});
1378		}
1379
1380		// Build proto VtxoRequests
1381		let vtxo_requests = participation.outputs.iter()
1382			.map(|req|
1383				protos::VtxoRequest {
1384					policy: req.policy.serialize(),
1385					amount: req.amount.to_sat(),
1386			})
1387			.collect::<Vec<_>>();
1388
1389		// Submit participation to server and get unlock_hash
1390		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1391			input_vtxos,
1392			vtxo_requests,
1393			unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1394		}).await.context("error submitting round participation to server")?.into_inner();
1395
1396		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1397			.context("invalid unlock hash from server")?;
1398
1399		let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1400
1401		info!("Non-interactive round participation submitted, it will automatically execute \
1402			when you next sync your wallet after the round happened \
1403			(and has sufficient confirmations).",
1404		);
1405
1406		let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1407		Ok(StoredRoundState::new(id, state))
1408	}
1409
1410	/// Get all pending round states
1411	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1412		self.inner.db.get_pending_round_state_ids().await
1413	}
1414
1415	/// Get all pending round states
1416	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1417		let ids = self.inner.db.get_pending_round_state_ids().await?;
1418		let mut states = Vec::with_capacity(ids.len());
1419		for id in ids {
1420			if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1421				states.push(state);
1422			}
1423		}
1424		Ok(states)
1425	}
1426
1427	/// Balance locked in pending rounds
1428	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1429		let mut ret = Amount::ZERO;
1430		for round in self.pending_round_states().await? {
1431			ret += round.state().pending_balance();
1432		}
1433		Ok(ret)
1434	}
1435
1436	/// Returns all VTXOs that are locked in a pending round
1437	///
1438	/// This excludes all input VTXOs for which the output VTXOs have already
1439	/// been created.
1440	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1441		let mut ret = Vec::new();
1442		for round in self.pending_round_states().await? {
1443			let inputs = round.state().locked_pending_inputs();
1444			ret.reserve(inputs.len());
1445			for input in inputs {
1446				let v = self.get_vtxo_by_id(input.id()).await
1447					.context("unknown round input VTXO")?;
1448				ret.push(v);
1449			}
1450		}
1451		Ok(ret)
1452	}
1453
1454	/// Sync pending rounds that have finished but are waiting for confirmations
1455	pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1456		let states = self.pending_round_states().await?;
1457		if states.is_empty() {
1458			return Ok(HashMap::new());
1459		}
1460
1461		debug!("Syncing {} pending round states...", states.len());
1462
1463		let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1464		tokio_stream::iter(states).for_each_concurrent(10, |state| {
1465			let ret = ret.clone();
1466			async move {
1467				// not processing events here
1468				if state.state().ongoing_participation() {
1469					return;
1470				}
1471
1472				let mut state = match self.lock_wait_round_state(state.id()).await {
1473					Ok(Some(state)) => state,
1474					Ok(None) => return,
1475					Err(e) => {
1476						warn!("Error locking round state: {:#}", e);
1477						return;
1478					},
1479				};
1480
1481				let status = match state.state_mut().sync(self).await {
1482					Ok(s) => s,
1483					Err(e) => {
1484						warn!("Error syncing round: {:#}", e);
1485						return;
1486					},
1487				};
1488				trace!("Synced round #{}, status: {:?}", state.id(), status);
1489				match status {
1490					RoundStatus::Confirmed { funding_txid } => {
1491						info!("Round confirmed. Funding tx {}", funding_txid);
1492						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1493							warn!("Error removing confirmed round state from db: {:#}", e);
1494						}
1495					},
1496					RoundStatus::Unconfirmed { funding_txid } => {
1497						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1498						if let Err(e) = self.inner.db.update_round_state(&state).await {
1499							warn!("Error updating pending round state in db: {:#}", e);
1500						}
1501					},
1502					RoundStatus::Pending => {
1503						if let Err(e) = self.inner.db.update_round_state(&state).await {
1504							warn!("Error updating pending round state in db: {:#}", e);
1505						}
1506					},
1507					RoundStatus::Failed { ref error } => {
1508						error!("Round failed: {}", error);
1509						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1510							warn!("Error removing failed round state from db: {:#}", e);
1511						}
1512					},
1513					RoundStatus::Canceled => {
1514						error!("Round canceled");
1515						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1516							warn!("Error removing canceled round state from db: {:#}", e);
1517						}
1518					},
1519				}
1520				ret.lock().insert(state.id(), status);
1521			}
1522		}).await;
1523
1524		Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1525	}
1526
1527	/// Fetch last round event from server
1528	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1529		let (mut srv, _) = self.require_server().await?;
1530		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1531		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1532	}
1533
1534	async fn inner_process_event(
1535		&self,
1536		state: &mut StoredRoundState,
1537		event: Option<&RoundEvent>,
1538	) {
1539		if let Some(event) = event && state.state().ongoing_participation() {
1540			let updated = state.state_mut().process_event(self, &event).await;
1541			if updated {
1542				if let Err(e) = self.inner.db.update_round_state(&state).await {
1543					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1544				}
1545			}
1546		}
1547
1548		match state.state_mut().sync(self).await {
1549			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1550			Ok(s) if s.is_final() => {
1551				info!("Round #{} finished with result: {:?}", state.id(), s);
1552				if let Err(e) = self.inner.db.remove_round_state(&state).await {
1553					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1554				}
1555			},
1556			Ok(s) => {
1557				trace!("Round state #{} is now in state {:?}", state.id(), s);
1558				if let Err(e) = self.inner.db.update_round_state(&state).await {
1559					warn!("Error storing round state #{}: {:#}", state.id(), e);
1560				}
1561			},
1562		}
1563	}
1564
1565	/// Try to make incremental progress on all pending round states
1566	///
1567	/// If the `last_round_event` argument is not provided, it will be fetched
1568	/// from the server.
1569	pub async fn progress_pending_rounds(
1570		&self,
1571		last_round_event: Option<&RoundEvent>,
1572	) -> anyhow::Result<()> {
1573		let states = self.pending_round_states().await?;
1574		if states.is_empty() {
1575			return Ok(());
1576		}
1577
1578		info!("Processing {} rounds...", states.len());
1579
1580		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1581
1582		let has_ongoing_participation = states.iter()
1583			.any(|s| s.state().ongoing_participation());
1584		if has_ongoing_participation && last_round_event.is_none() {
1585			match self.get_last_round_event().await {
1586				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1587				Err(e) => {
1588					warn!("Error fetching round event, \
1589						failed to progress ongoing rounds: {:#}", e);
1590				},
1591			}
1592		}
1593
1594		let event = last_round_event.as_ref().map(|c| c.as_ref());
1595
1596		let futs = states.into_iter().map(async |state| {
1597			let locked = self.lock_wait_round_state(state.id()).await?;
1598			if let Some(mut locked) = locked {
1599				self.inner_process_event(&mut locked, event).await;
1600			}
1601			Ok::<_, anyhow::Error>(())
1602		});
1603
1604		futures::future::join_all(futs).await;
1605
1606		Ok(())
1607	}
1608
1609	pub async fn subscribe_round_events(&self)
1610		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1611	{
1612		let (mut srv, _) = self.require_server().await?;
1613		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1614			.into_inner().map(|m| {
1615				let m = m.context("received error on event stream")?;
1616				let e = RoundEvent::try_from(m.clone())
1617					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1618				trace!("Received round event: {}", e);
1619				Ok::<_, anyhow::Error>(e)
1620			});
1621		Ok(events)
1622	}
1623
1624	/// A blocking call that will try to perform a full round participation
1625	/// for all ongoing rounds
1626	///
1627	/// Returns only once there is no ongoing rounds anymore.
1628	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1629		let mut events = self.subscribe_round_events().await?;
1630
1631		loop {
1632			// NB: we need to load all ongoing rounds on every iteration here
1633			// because some might be finished by another call
1634			let state_ids = self.pending_round_states().await?.iter()
1635				.filter(|s| s.state().ongoing_participation())
1636				.map(|s| s.id())
1637				.collect::<Vec<_>>();
1638
1639			if state_ids.is_empty() {
1640				info!("All rounds handled");
1641				return Ok(());
1642			}
1643
1644			let event = events.next().await
1645				.context("events stream broke")?
1646				.context("error on event stream")?;
1647
1648			let futs = state_ids.into_iter().map(async |state| {
1649				let locked = self.lock_wait_round_state(state).await?;
1650				if let Some(mut locked) = locked {
1651					self.inner_process_event(&mut locked, Some(&event)).await;
1652				}
1653				Ok::<_, anyhow::Error>(())
1654			});
1655
1656			futures::future::join_all(futs).await;
1657		}
1658	}
1659
1660	/// Will cancel all pending rounds that can safely be canceled
1661	///
1662	/// All rounds that have not started yet can safely be canceled,
1663	/// as well as rounds where we have not yet signed any forfeit txs.
1664	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1665		// initial load to get all pending round states ids
1666		let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1667
1668		let futures = state_ids.into_iter().map(|state_id| {
1669			async move {
1670				// wait for lock and load again to ensure most recent state
1671				let mut state = match self.lock_wait_round_state(state_id).await {
1672					Ok(Some(s)) => s,
1673					Ok(None) => return,
1674					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1675				};
1676
1677				match state.state_mut().try_cancel(self).await {
1678					Ok(true) => {
1679						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1680							warn!("Error removing canceled round state from db: {:#}", e);
1681						}
1682					},
1683					Ok(false) => {},
1684					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1685				}
1686			}
1687		});
1688
1689		join_all(futures).await;
1690
1691		Ok(())
1692	}
1693
1694	/// Try to cancel the given round
1695	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1696		let mut state = self.lock_wait_round_state(id).await?
1697			.context("round state not found")?;
1698
1699		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1700			self.inner.db.remove_round_state(&state).await
1701				.context("error removing canceled round state from db")?;
1702		} else {
1703			bail!("failed to cancel round");
1704		}
1705
1706		Ok(())
1707	}
1708
1709	/// Participate in a round
1710	///
1711	/// This function will start a new round participation and block until
1712	/// the round is finished.
1713	/// After this method returns the round state will be kept active until
1714	/// the round tx fully confirms.
1715	pub(crate) async fn participate_round(
1716		&self,
1717		participation: RoundParticipation,
1718		movement_kind: Option<RoundMovement>,
1719	) -> anyhow::Result<RoundStatus> {
1720		let mut state = self.join_next_round(participation, movement_kind).await?;
1721
1722		info!("Waiting for a round start...");
1723		let mut events = self.subscribe_round_events().await?;
1724
1725		loop {
1726			if !state.state().ongoing_participation() {
1727				let status = state.state_mut().sync(self).await?;
1728				match status {
1729					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1730					RoundStatus::Canceled => bail!("round canceled"),
1731					status => return Ok(status),
1732				}
1733			}
1734
1735			let event = events.next().await
1736				.context("events stream broke")?
1737				.context("error on event stream")?;
1738			if state.state_mut().process_event(self, &event).await {
1739				self.inner.db.update_round_state(&state).await?;
1740			}
1741		}
1742	}
1743}