Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::{join_all, try_join_all};
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::movement::manager::OnDropStatus;
37use crate::{SECP, Wallet, WalletVtxo};
38use crate::movement::{MovementId, MovementStatus};
39use crate::movement::update::MovementUpdate;
40use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
41
42/// How long [`Wallet::lock_wait_round_state`] waits for a contended
43/// round lock before giving up. Long enough to outlast a normal round.
44const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
45use crate::subsystem::{RoundMovement, Subsystem};
46
47
48/// The type string for the hArk leaf transition
49const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
50
51/// Struct to communicate your specific participation for an Ark round.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RoundParticipation {
54	#[serde(with = "ark::encode::serde::vec")]
55	pub inputs: Vec<Vtxo<Full>>,
56	/// The output VTXOs that we request in the round,
57	/// including change
58	pub outputs: Vec<VtxoRequest>,
59	/// Optional mailbox identifier for round completion notification
60	#[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
61	pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
62}
63
64impl RoundParticipation {
65	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
66		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
67		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
68		let fee = input_amount - output_amount;
69		Ok(MovementUpdate::new()
70			.consumed_vtxos(&self.inputs)
71			.intended_balance(SignedAmount::ZERO)
72			.effective_balance( - fee.to_signed()?)
73			.fee(fee)
74		)
75	}
76}
77
78#[derive(Debug, Clone)]
79pub enum RoundStatus {
80	/// The round was successful and is fully confirmed
81	Confirmed {
82		funding_txid: Txid,
83	},
84	/// Round successful but not fully confirmed
85	Unconfirmed {
86		funding_txid: Txid,
87	},
88	/// Round didn't finish yet
89	Pending,
90	/// The round failed
91	Failed {
92		error: String,
93	},
94	/// User canceled the round
95	Canceled,
96}
97
98impl RoundStatus {
99	/// Whether this is the final state and it won't change anymore
100	pub fn is_final(&self) -> bool {
101		match self {
102			Self::Confirmed { .. } => true,
103			Self::Unconfirmed { .. } => false,
104			Self::Pending => false,
105			Self::Failed { .. } => true,
106			Self::Canceled => true,
107		}
108	}
109
110	/// Whether it looks like the round succeeded
111	pub fn is_success(&self) -> bool {
112		match self {
113			Self::Confirmed { .. } => true,
114			Self::Unconfirmed { .. } => true,
115			Self::Pending => false,
116			Self::Failed { .. } => false,
117			Self::Canceled => false,
118		}
119	}
120}
121
122/// State of the progress of a round participation
123///
124/// An instance of this struct is kept all the way from the intention of joining
125/// the next round, until either the round fully confirms or it fails and we are
126/// sure it won't have any effect on our wallet.
127///
128/// As soon as we have signed forfeit txs for the round, we keep track of this
129/// round attempt until we see another attempt we participated in confirm or
130/// we gain confidence that the failed attempt will never confirm.
131//
132//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
133// to have better control. this way we can touch db before we sent forfeit sigs
134pub struct RoundState {
135	/// Round is fully done
136	pub(crate) done: bool,
137
138	/// Our participation in this round
139	pub(crate) participation: RoundParticipation,
140
141	/// The flow of the round in case it is still ongoing with the server
142	pub(crate) flow: RoundFlowState,
143
144	/// The new output vtxos of this round participation
145	///
146	/// After we finish the interactive part, we fill this with the uncompleted
147	/// VTXOs which we then try to complete with the unlock preimage.
148	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
149
150	/// Whether we sent our forfeit signatures to the server
151	///
152	/// If we did this and the server refused to reveal our new VTXOs,
153	/// we will be forced to exit.
154	//TODO(stevenroose) implement exit when this is true and we can't make progress
155	// probably based on the input vtxos becoming close to expiry
156	pub(crate) sent_forfeit_sigs: bool,
157
158	/// The ID of the [Movement] associated with this round
159	pub(crate) movement_id: Option<MovementId>,
160}
161
162impl RoundState {
163	fn new_interactive(
164		participation: RoundParticipation,
165		movement_id: Option<MovementId>,
166	) -> Self {
167		Self {
168			participation,
169			movement_id,
170			flow: RoundFlowState::InteractivePending,
171			new_vtxos: Vec::new(),
172			sent_forfeit_sigs: false,
173			done: false,
174		}
175	}
176
177	fn new_delegated(
178		participation: RoundParticipation,
179		unlock_hash: UnlockHash,
180		movement_id: Option<MovementId>,
181	) -> Self {
182		Self {
183			participation,
184			movement_id,
185			flow: RoundFlowState::NonInteractivePending { unlock_hash },
186			new_vtxos: Vec::new(),
187			sent_forfeit_sigs: false,
188			done: false,
189		}
190	}
191
192	/// Our participation in this round
193	pub fn participation(&self) -> &RoundParticipation {
194		&self.participation
195	}
196
197	/// the unlock hash if already known
198	pub fn unlock_hash(&self) -> Option<UnlockHash> {
199		match self.flow {
200			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201			RoundFlowState::InteractivePending => None,
202			RoundFlowState::InteractiveOngoing { .. } => None,
203			RoundFlowState::Failed { .. } => None,
204			RoundFlowState::Canceled => None,
205			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206		}
207	}
208
209	pub fn funding_tx(&self) -> Option<&Transaction> {
210		match self.flow {
211			RoundFlowState::NonInteractivePending { .. } => None,
212			RoundFlowState::InteractivePending => None,
213			RoundFlowState::InteractiveOngoing { .. } => None,
214			RoundFlowState::Failed { .. } => None,
215			RoundFlowState::Canceled => None,
216			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217		}
218	}
219
220	/// Whether the interactive part of the round is still ongoing
221	pub fn ongoing_participation(&self) -> bool {
222		match self.flow {
223			RoundFlowState::NonInteractivePending { .. } => false,
224			RoundFlowState::InteractivePending => true,
225			RoundFlowState::InteractiveOngoing { .. } => true,
226			RoundFlowState::Failed { .. } => false,
227			RoundFlowState::Canceled => false,
228			RoundFlowState::Finished { .. } => false,
229		}
230	}
231
232	/// Tries to cancel the round and returns whether it was succesfully canceled
233	/// or if it was already canceled or failed
234	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235		let ret = match self.flow {
236			RoundFlowState::NonInteractivePending { .. } => {
237				//TODO(stevenroose) we have to cancel with server
238				bail!("it is currently not yet possible to cancel pending delegated rounds");
239			},
240			RoundFlowState::Canceled => true,
241			RoundFlowState::Failed { .. } => true,
242			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
243				self.flow = RoundFlowState::Canceled;
244				true
245			},
246			RoundFlowState::Finished { .. } => false,
247		};
248		if ret {
249			persist_round_failure(wallet, &self.participation, self.movement_id).await
250				.context("failed to persist round failure for cancelation")?;
251		}
252		Ok(ret)
253	}
254
255	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
256		match start_attempt(wallet, &self.participation, attempt).await {
257			Ok(state) => {
258				self.flow = RoundFlowState::InteractiveOngoing {
259					round_seq: attempt.round_seq,
260					attempt_seq: attempt.attempt_seq,
261					state: state,
262				};
263			},
264			Err(e) => {
265				self.flow = RoundFlowState::Failed {
266					error: format!("{:#}", e),
267				};
268			},
269		}
270	}
271
272	/// Processes the given event and returns true if some update was made to the state
273	pub async fn process_event(
274		&mut self,
275		wallet: &Wallet,
276		event: &RoundEvent,
277	) -> bool {
278		let _: Infallible = match self.flow {
279			RoundFlowState::InteractivePending => {
280				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
281					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
282					self.try_start_attempt(wallet, e).await;
283					return true;
284				} else {
285					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
286						event.kind(), event.round_seq(), event.attempt_seq(),
287					);
288					return false;
289				}
290			},
291			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
292				// here we catch the cases where we're in a wrong flow
293
294				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
295					warn!("Round {} failed by server", round_seq);
296					self.flow = RoundFlowState::Failed {
297						error: format!("round {} failed by server", round_seq),
298					};
299					return true;
300				}
301
302				if event.round_seq() > round_seq {
303					// new round started, we don't support multiple parallel rounds,
304					// this means we failed
305					self.flow = RoundFlowState::Failed {
306						error: format!("round {} started while we were on {}",
307							event.round_seq(), round_seq,
308						),
309					};
310					return true;
311				}
312
313				if event.attempt_seq() < attempt_seq {
314					trace!("ignoring replayed message from old attempt");
315					return false;
316				}
317
318				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
319					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
320					self.try_start_attempt(wallet, e).await;
321					return true;
322				}
323				trace!("Processing event {} for round attempt {}:{} in state {}",
324					event.kind(), round_seq, attempt_seq, state.kind(),
325				);
326
327				return match progress_attempt(state, wallet, &self.participation, event).await {
328					AttemptProgressResult::NotUpdated => false,
329					AttemptProgressResult::Updated { new_state } => {
330						*state = new_state;
331						true
332					},
333					AttemptProgressResult::Failed(e) => {
334						warn!("Round failed with error: {:#}", e);
335						self.flow = RoundFlowState::Failed {
336							error: format!("{:#}", e),
337						};
338						true
339					},
340					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
341						self.new_vtxos = vtxos;
342						let funding_txid = funding_tx.compute_txid();
343						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
344						if let Some(mid) = self.movement_id {
345							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
346								warn!("Error updating the round funding txid: {:#}", e);
347							}
348						}
349						true
350					},
351				};
352			},
353			RoundFlowState::NonInteractivePending { .. }
354				| RoundFlowState::Finished { .. }
355				| RoundFlowState::Failed { .. }
356				| RoundFlowState::Canceled => return false,
357		};
358	}
359
360	/// Sync the round's status and return it
361	///
362	/// When success or failure is returned, the round state can be eliminated
363	//TODO(stevenroose) make RoundState manage its own db record
364	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
365		match self.flow {
366			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
367				Ok(RoundStatus::Confirmed {
368					funding_txid: funding_tx.compute_txid(),
369				})
370			},
371
372			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
373				Ok(RoundStatus::Pending)
374			},
375			RoundFlowState::Failed { ref error } => {
376				persist_round_failure(wallet, &self.participation, self.movement_id).await
377					.context("failed to persist round failure")?;
378				Ok(RoundStatus::Failed { error: error.clone() })
379			},
380			RoundFlowState::Canceled => {
381				persist_round_failure(wallet, &self.participation, self.movement_id).await
382					.context("failed to persist round failure")?;
383				Ok(RoundStatus::Canceled)
384			},
385
386			RoundFlowState::NonInteractivePending { unlock_hash } => {
387				match progress_delegated(wallet, &self.participation, unlock_hash).await {
388					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
389					Ok(HarkProgressResult::RoundNotFound) => {
390						self.handle_round_not_found(wallet).await
391					},
392					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
393						let funding_txid = funding_tx.compute_txid();
394						self.new_vtxos = new_vtxos;
395						self.flow = RoundFlowState::Finished {
396							funding_tx: funding_tx.clone(),
397							unlock_hash: unlock_hash,
398						};
399
400						persist_round_success(
401							wallet,
402							&self.participation,
403							self.movement_id,
404							&self.new_vtxos,
405							&funding_tx,
406						).await.context("failed to store successful round in DB!")?;
407
408						self.done = true;
409
410						Ok(RoundStatus::Confirmed { funding_txid })
411					},
412					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
413						if let Some(mid) = self.movement_id {
414							update_funding_txid(wallet, mid, funding_txid).await
415								.context("failed to update funding txid in DB")?;
416						}
417						Ok(RoundStatus::Unconfirmed { funding_txid })
418					},
419
420					//TODO(stevenroose) should we mark as failed for these cases?
421
422					Err(HarkForfeitError::Err(e)) => {
423						//TODO(stevenroose) we failed here but we might actualy be able to
424						// succeed if we retry. should we implement some kind of limited
425						// retry after which we mark as failed?
426						Err(e.context("error progressing delegated round"))
427					},
428					Err(HarkForfeitError::SentForfeits(e)) => {
429						self.sent_forfeit_sigs = true;
430						Err(e.context("error progressing delegated round \
431							after sending forfeit tx signatures"))
432					},
433				}
434			},
435			// interactive part finished, but didn't forfeit yet
436			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
437				let funding_txid = funding_tx.compute_txid();
438				let confirmed = check_funding_tx_confirmations(
439					wallet, funding_txid, &funding_tx,
440				).await.context("error checking funding tx confirmations")?;
441				if !confirmed {
442					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
443					return Ok(RoundStatus::Unconfirmed { funding_txid });
444				}
445
446				match hark_vtxo_swap(
447					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
448				).await {
449					Ok(()) => {
450						persist_round_success(
451							wallet,
452							&self.participation,
453							self.movement_id,
454							&self.new_vtxos,
455							&funding_tx,
456						).await.context("failed to store successful round in DB!")?;
457
458						self.done = true;
459
460						Ok(RoundStatus::Confirmed { funding_txid })
461					},
462					Err(HarkForfeitError::Err(e)) => {
463						Err(e.context("error forfeiting VTXOs after round"))
464					},
465					Err(HarkForfeitError::SentForfeits(e)) => {
466						self.sent_forfeit_sigs = true;
467						Err(e.context("error after having signed and sent \
468							forfeit signatures to server"))
469					},
470				}
471			},
472		}
473	}
474
475	/// Once we know the signed round funding tx, this returns the output VTXOs
476	/// for this round.
477	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
478		if self.new_vtxos.is_empty() {
479			None
480		} else {
481			Some(&self.new_vtxos)
482		}
483	}
484
485	/// Returns the input VTXOs that are locked in this round, but only
486	/// if no output VTXOs were issued yet.
487	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
488		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
489		match self.flow {
490			RoundFlowState::NonInteractivePending { .. }
491				| RoundFlowState::InteractivePending
492				| RoundFlowState::InteractiveOngoing { .. }
493			=> {
494				&self.participation.inputs
495			},
496			RoundFlowState::Finished { .. } => if self.done {
497				// inputs already unlocked
498				&[]
499			} else {
500				&self.participation.inputs
501			},
502			RoundFlowState::Failed { .. }
503				| RoundFlowState::Canceled
504			=> {
505				// inputs already unlocked
506				&[]
507			},
508		}
509	}
510
511	/// The balance pending in this round
512	///
513	/// This becomes zero once the new round VTXOs are unlocked.
514	pub fn pending_balance(&self) -> Amount {
515		if self.done {
516			return Amount::ZERO;
517		}
518
519		match self.flow {
520			RoundFlowState::NonInteractivePending { .. }
521				| RoundFlowState::InteractivePending
522				| RoundFlowState::InteractiveOngoing { .. }
523				| RoundFlowState::Finished { .. }
524			=> {
525				self.participation.outputs.iter().map(|o| o.amount).sum()
526			},
527			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
528				Amount::ZERO
529			},
530		}
531	}
532
533	/// Handle the case where the server reports our round participation as not found.
534	///
535	/// If we sent forfeit signatures (which only happens after the round was
536	/// confirmed), this is adversarial — trigger unilateral exit.
537	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
538	/// and verify with the server that they're still considered spendable.
539	async fn handle_round_not_found(
540		&mut self,
541		wallet: &Wallet,
542	) -> anyhow::Result<RoundStatus> {
543		info!("Server reports round participation not found (no forfeits sent)");
544		self.flow = RoundFlowState::Failed {
545			error: "server reports round participation not found".into(),
546		};
547		persist_round_failure(wallet, &self.participation, self.movement_id).await
548			.context("failed to persist round failure")?;
549
550		Ok(RoundStatus::Failed {
551			error: "server reports round participation not found".into(),
552		})
553	}
554}
555
556/// The state of the process flow of a round
557///
558/// This tracks the progress of the interactive part of the round, from
559/// waiting to start until finishing either succesfully or with a failure.
560pub enum RoundFlowState {
561	/// We don't do flow and we just wait for the round to finish
562	NonInteractivePending {
563		unlock_hash: UnlockHash,
564	},
565
566	/// Waiting for round to happen
567	InteractivePending,
568	/// Interactive part ongoing
569	InteractiveOngoing {
570		round_seq: RoundSeq,
571		attempt_seq: usize,
572		state: AttemptState,
573	},
574
575	/// Interactive part finished, waiting for confirmation
576	Finished {
577		funding_tx: Transaction,
578		unlock_hash: UnlockHash,
579	},
580
581	/// Failed during round
582	Failed {
583		error: String,
584	},
585
586	/// User canceled round
587	Canceled,
588}
589
590/// The state of a single round attempt
591///
592/// For each attempt that we participate in, we keep the state of our concrete
593/// participation.
594pub enum AttemptState {
595	AwaitingAttempt,
596	AwaitingUnsignedVtxoTree {
597		cosign_keys: Vec<Keypair>,
598		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
599		unlock_hash: UnlockHash,
600	},
601	AwaitingFinishedRound {
602		unsigned_round_tx: Transaction,
603		vtxos_spec: VtxoTreeSpec,
604		unlock_hash: UnlockHash,
605	},
606}
607
608impl AttemptState {
609	/// The state kind represented as a string
610	fn kind(&self) -> &'static str {
611		match self {
612			Self::AwaitingAttempt => "AwaitingAttempt",
613			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
614			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
615		}
616	}
617}
618
619/// Result from trying to progress an ongoing round attempt
620enum AttemptProgressResult {
621	Finished {
622		funding_tx: Transaction,
623		vtxos: Vec<Vtxo<Full>>,
624		unlock_hash: UnlockHash,
625	},
626	Failed(anyhow::Error),
627	/// When the state changes, this variant is returned
628	///
629	/// If during the processing, we have signed any forfeit txs and tried
630	/// sending them to the server, the [UnconfirmedRound] instance is returned
631	/// so that it can be stored in the state.
632	Updated {
633		new_state: AttemptState,
634	},
635	NotUpdated,
636}
637
638/// Participate in the new round attempt by submitting our round participation
639async fn start_attempt(
640	wallet: &Wallet,
641	participation: &RoundParticipation,
642	event: &RoundAttempt,
643) -> anyhow::Result<AttemptState> {
644	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
645
646	// Assign cosign pubkeys to the payment requests.
647	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
648		.take(participation.outputs.len())
649		.collect::<Vec<_>>();
650
651	// Prepare round participation info.
652	// For each of our requested vtxo output, we need a set of public and secret nonces.
653	let cosign_nonces = cosign_keys.iter()
654		.map(|key| {
655			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
656			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
657			for _ in 0..ark_info.nb_round_nonces {
658				let (s, p) = musig::nonce_pair(key);
659				secs.push(s);
660				pubs.push(p);
661			}
662			(secs, pubs)
663		})
664		.take(participation.outputs.len())
665		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
666
667
668	// The round has now started. We can submit our payment.
669	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
670		participation.inputs.len(), participation.outputs.len(),
671	);
672
673	// Build signed requests with mailbox IDs
674	let unblinded_mailbox_id = wallet.mailbox_identifier();
675	let signed_reqs = participation.outputs.iter()
676		.zip(cosign_keys.iter())
677		.zip(cosign_nonces.iter())
678		.map(|((req, cosign_key), (_sec, pub_nonces))| {
679			SignedVtxoRequest {
680				vtxo: req.clone(),
681				cosign_pubkey: cosign_key.public_key(),
682				nonces: pub_nonces.clone(),
683			}
684		})
685		.collect::<Vec<_>>();
686
687	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
688	for vtxo in participation.inputs.iter() {
689		let keypair = wallet.get_vtxo_key(vtxo).await
690			.map_err(HarkForfeitError::Err)?;
691		input_vtxos.push(protos::InputVtxo {
692			vtxo_id: vtxo.id().to_bytes().to_vec(),
693			attestation: {
694				let attestation = RoundAttemptAttestation::new(
695					event.challenge, vtxo.id(), &signed_reqs, &keypair,
696				);
697				attestation.serialize()
698			},
699		});
700	}
701
702	// Register VTXO transaction chains with server before round participation
703	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
704		.map_err(HarkForfeitError::Err)?;
705
706	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
707		input_vtxos: input_vtxos,
708		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
709		#[allow(deprecated)]
710		offboard_requests: vec![],
711		unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
712	}).await.context("Ark server refused our payment submission")?;
713
714	Ok(AttemptState::AwaitingUnsignedVtxoTree {
715		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
716		cosign_keys: cosign_keys,
717		secret_nonces: cosign_nonces.into_iter()
718			.map(|(sec, _pub)| sec.into_iter()
719				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
720				.collect())
721			.collect(),
722	})
723}
724
725/// just an internal type; need Error trait to work with anyhow
726#[derive(Debug, thiserror::Error)]
727enum HarkForfeitError {
728	/// An error happened after we sent forfeit signatures to the server
729	#[error("error after forfeits were sent")]
730	SentForfeits(#[source] anyhow::Error),
731	/// An error happened before we sent forfeit signatures to the server
732	#[error("error before forfeits were sent")]
733	Err(#[source] anyhow::Error),
734}
735
736async fn hark_cosign_leaf(
737	wallet: &Wallet,
738	srv: &mut ServerConnection,
739	funding_tx: &Transaction,
740	vtxo: &mut Vtxo<Full>,
741) -> anyhow::Result<()> {
742	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
743		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
744		.with_context(|| format!(
745			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
746		))?.1;
747	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
748	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
749		protos::LeafVtxoCosignRequest::from(cosign_req),
750	).await
751		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
752		.into_inner().try_into()
753		.context("bad leaf vtxo cosign response")?;
754	ensure!(ctx.finalize(vtxo, cosign_resp),
755		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
756	);
757
758	Ok(())
759}
760
761/// Finish the hArk VTXO swap protocol
762///
763/// This includes:
764/// - requesting cosignature of the locked hArk leaves
765/// - sending forfeit txs to the server in return for the unlock preimage
766///
767/// NB all the actions in this function are idempotent, meaning that the server
768/// allows them to be done multiple times. this means that if this function calls
769/// fails, it's safe to just call it again at a later time
770async fn hark_vtxo_swap(
771	wallet: &Wallet,
772	participation: &RoundParticipation,
773	output_vtxos: &mut [Vtxo<Full>],
774	funding_tx: &Transaction,
775	unlock_hash: UnlockHash,
776) -> Result<(), HarkForfeitError> {
777	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
778
779	// before we start make sure the server has our input vtxo signatures
780	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
781		.context("couldn't send our input vtxo transactions to server")
782		.map_err(HarkForfeitError::Err)?;
783
784	// first get the leaves signed
785	for vtxo in output_vtxos.iter_mut() {
786		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
787			.map_err(HarkForfeitError::Err)?;
788	}
789
790	// then do the forfeit dance
791
792	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
793		unlock_hash: unlock_hash.to_byte_array().to_vec(),
794		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
795	}).await
796		.context("request forfeits nonces call failed")
797		.map_err(HarkForfeitError::Err)?
798		.into_inner().public_nonces.into_iter()
799		.map(|b| musig::PublicNonce::from_bytes(b))
800		.collect::<Result<Vec<_>, _>>()
801		.context("invalid forfeit nonces")
802		.map_err(HarkForfeitError::Err)?;
803
804	if server_nonces.len() != participation.inputs.len() {
805		return Err(HarkForfeitError::Err(anyhow!(
806			"server sent {} nonce pairs, expected {}",
807			server_nonces.len(), participation.inputs.len(),
808		)));
809	}
810
811	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
812	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
813		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
814			.ok().flatten().with_context(|| format!(
815				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
816			)).map_err(HarkForfeitError::Err)?.1;
817		forfeit_bundles.push(HashLockedForfeitBundle::new(
818			input, unlock_hash, &user_key, &nonces,
819		))
820	}
821
822	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
823		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
824	}).await
825		.context("forfeit vtxos call failed")
826		.map_err(HarkForfeitError::SentForfeits)?
827		.into_inner().unlock_preimage.as_slice().try_into()
828		.context("invalid preimage length")
829		.map_err(HarkForfeitError::SentForfeits)?;
830
831	for vtxo in output_vtxos.iter_mut() {
832		if !vtxo.provide_unlock_preimage(preimage) {
833			return Err(HarkForfeitError::SentForfeits(anyhow!(
834				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
835				preimage.as_hex(), vtxo.id(), unlock_hash,
836			)));
837		}
838
839		// then validate the vtxo works
840		vtxo.validate(&funding_tx).with_context(|| format!(
841			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
842		)).map_err(HarkForfeitError::SentForfeits)?;
843	}
844
845	Ok(())
846}
847
848fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
849	match vtxo.validate(funding_tx) {
850		Err(VtxoValidationError::GenesisTransition {
851			genesis_idx, genesis_len, transition_kind, ..
852		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
853		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
854			vtxo.serialize_hex(),
855		)),
856		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
857	}
858}
859
860fn check_round_matches_participation(
861	part: &RoundParticipation,
862	new_vtxos: &[Vtxo<Full>],
863	funding_tx: &Transaction,
864) -> anyhow::Result<()> {
865	ensure!(new_vtxos.len() == part.outputs.len(),
866		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
867	);
868
869	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
870		ensure!(vtxo.amount() == req.amount,
871			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
872		);
873		ensure!(*vtxo.policy() == req.policy,
874			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
875		);
876
877		// We accept the VTXO if only the hArk transition (last) failure happens
878		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
879	}
880
881	Ok(())
882}
883
884/// Check the confirmation status of a funding tx
885///
886/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
887/// The required number of confirmations depends on the wallet's configuration.
888///
889/// Returns false if the funding tx seems valid but not confirmed yet.
890///
891/// Returns an error if the chain source fails or if we can't submit the tx to the
892/// mempool, suggesting it might be double spent.
893async fn check_funding_tx_confirmations(
894	wallet: &Wallet,
895	funding_txid: Txid,
896	funding_tx: &Transaction,
897) -> anyhow::Result<bool> {
898	let tip = wallet.inner.chain.tip().await.context("chain source error")?;
899	let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
900	let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
901	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
902		funding_txid, tx_status, tip,
903	);
904	match tx_status {
905		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
906		TxStatus::Mempool | TxStatus::Confirmed(_) => {
907			if wallet.inner.config.round_tx_required_confirmations == 0 {
908				debug!("Accepting round funding tx without confirmations because of configuration");
909				Ok(true)
910			} else {
911				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
912				Ok(false)
913			}
914		},
915		TxStatus::NotFound => {
916			// let's try to submit it to our mempool
917			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
918			// reliably distinguish the cases of our chain source having issues and the tx
919			// actually being rejected which suggests the round was double-spent
920			if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
921				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
922					funding_txid, serialize_hex(funding_tx), e,
923				))
924			} else {
925				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
926				Ok(false)
927			}
928		},
929	}
930}
931
932enum HarkProgressResult {
933	RoundPending,
934	RoundNotFound,
935	FundingTxUnconfirmed {
936		funding_txid: Txid,
937	},
938	Ok {
939		funding_tx: Transaction,
940		new_vtxos: Vec<Vtxo<Full>>,
941	},
942}
943
944async fn progress_delegated(
945	wallet: &Wallet,
946	participation: &RoundParticipation,
947	unlock_hash: UnlockHash,
948) -> Result<HarkProgressResult, HarkForfeitError> {
949	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
950
951	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
952		unlock_hash: unlock_hash.to_byte_array().to_vec(),
953	}).await {
954		Ok(resp) => resp.into_inner(),
955		Err(err) if err.code() == tonic::Code::NotFound => {
956			return Ok(HarkProgressResult::RoundNotFound);
957		},
958		Err(err) => {
959			return Err(HarkForfeitError::Err(
960				anyhow::Error::from(err).context("error checking round participation status"),
961			));
962		},
963	};
964	let status = protos::RoundParticipationStatus::try_from(resp.status)
965		.context("unknown status from server")
966		.map_err(HarkForfeitError::Err)	?;
967
968	if status == protos::RoundParticipationStatus::RoundPartPending {
969		trace!("Hark round still pending");
970		return Ok(HarkProgressResult::RoundPending);
971	}
972
973	// Since we got here, we clearly don't think we're finished.
974	// So even if the server thinks we did the dance before, we need the
975	// cosignature on the leaf tx so we need to do the dance again.
976	// "Guilty feet have got no rhythm."
977	if status == protos::RoundParticipationStatus::RoundPartReleased {
978		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
979		warn!("Server says preimage was already released for hArk participation \
980			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
981		);
982	}
983
984	let funding_tx_bytes = resp.round_funding_tx
985		.context("funding txid should be provided when status is not pending")
986		.map_err(HarkForfeitError::Err)?;
987	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
988		.context("invalid funding txid")
989		.map_err(HarkForfeitError::Err)?;
990	let funding_txid = funding_tx.compute_txid();
991	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
992		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
993	);
994
995	// Check the confirmation status of the funding tx
996	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
997		Ok(true) => {},
998		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
999		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
1000	}
1001
1002	let mut new_vtxos = resp.output_vtxos.into_iter()
1003		.map(|v| <Vtxo<Full>>::deserialize(&v))
1004		.collect::<Result<Vec<_>, _>>()
1005		.context("invalid output VTXOs from server")
1006		.map_err(HarkForfeitError::Err)?;
1007
1008	// Check that the vtxos match our participation in the exact order
1009	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1010		.context("new VTXOs received from server don't match our participation")
1011		.map_err(HarkForfeitError::Err)?;
1012
1013	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1014		.context("error forfeiting hArk VTXOs")
1015		.map_err(HarkForfeitError::SentForfeits)?;
1016
1017	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1018}
1019
1020async fn progress_attempt(
1021	state: &AttemptState,
1022	wallet: &Wallet,
1023	part: &RoundParticipation,
1024	event: &RoundEvent,
1025) -> AttemptProgressResult {
1026	// we will match only the states and messages required to make progress,
1027	// all else we ignore, except an unexpected finish
1028
1029	match (state, event) {
1030
1031		(
1032			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1033			RoundEvent::VtxoProposal(e),
1034		) => {
1035			trace!("Received VtxoProposal: {:#?}", e);
1036			match sign_vtxo_tree(
1037				wallet,
1038				part,
1039				&cosign_keys,
1040				&secret_nonces,
1041				&e.unsigned_round_tx,
1042				&e.vtxos_spec,
1043				&e.cosign_agg_nonces,
1044			).await {
1045				Ok(()) => {
1046					AttemptProgressResult::Updated {
1047						new_state: AttemptState::AwaitingFinishedRound {
1048							unsigned_round_tx: e.unsigned_round_tx.clone(),
1049							vtxos_spec: e.vtxos_spec.clone(),
1050							unlock_hash: *unlock_hash,
1051						},
1052					}
1053				},
1054				Err(e) => {
1055					trace!("Error signing VTXO tree: {:#}", e);
1056					AttemptProgressResult::Failed(e)
1057				},
1058			}
1059		},
1060
1061		(
1062			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1063			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1064		) => {
1065			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1066				return AttemptProgressResult::Failed(anyhow!(
1067					"signed funding tx ({}) doesn't match tx received before ({})",
1068					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1069				));
1070			}
1071
1072			if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1073				warn!("Failed to broadcast signed round tx: {:#}", e);
1074			}
1075
1076			match construct_new_vtxos(
1077				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1078			).await {
1079				Ok(v) => AttemptProgressResult::Finished {
1080					funding_tx: signed_round_tx.clone(),
1081					vtxos: v,
1082					unlock_hash: *unlock_hash,
1083				},
1084				Err(e) => AttemptProgressResult::Failed(anyhow!(
1085					"failed to construct new VTXOs for round: {:#}", e,
1086				)),
1087			}
1088		},
1089
1090		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1091			AttemptProgressResult::Failed(anyhow!(
1092				"unexpectedly received a finished round while we were in state {}",
1093				state.kind(),
1094			))
1095		},
1096
1097		(state, _) => {
1098			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1099			AttemptProgressResult::NotUpdated
1100		},
1101	}
1102}
1103
1104async fn sign_vtxo_tree(
1105	wallet: &Wallet,
1106	participation: &RoundParticipation,
1107	cosign_keys: &[Keypair],
1108	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1109	unsigned_round_tx: &Transaction,
1110	vtxo_tree: &VtxoTreeSpec,
1111	cosign_agg_nonces: &[musig::AggregatedNonce],
1112) -> anyhow::Result<()> {
1113	let (srv, _) = wallet.require_server().await.context("server not available")?;
1114
1115	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1116
1117	// Check that the proposal contains our inputs.
1118	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1119	for vtxo_req in vtxo_tree.iter_vtxos() {
1120		if let Some(i) = my_vtxos.iter().position(|v| {
1121			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1122		}) {
1123			my_vtxos.swap_remove(i);
1124		}
1125	}
1126	if !my_vtxos.is_empty() {
1127		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1128	}
1129
1130	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1131	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1132	trace!("Sending vtxo signatures to server...");
1133	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1134		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1135		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1136		let part_sigs = unsigned_vtxos.cosign_branch(
1137			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1138		).context("failed to cosign branch: our request not part of tree")?;
1139
1140		info!("Sending {} partial vtxo cosign signatures for pk {}",
1141			part_sigs.len(), key.public_key(),
1142		);
1143
1144		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1145			pubkey: key.public_key().serialize().to_vec(),
1146			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1147		}).await.context("error sending vtxo signatures")?;
1148
1149		Result::<(), anyhow::Error>::Ok(())
1150	})).await.context("error sending VTXO signatures")?;
1151	trace!("Done sending vtxo signatures to server");
1152
1153	Ok(())
1154}
1155
1156async fn construct_new_vtxos(
1157	participation: &RoundParticipation,
1158	unsigned_round_tx: &Transaction,
1159	vtxo_tree: &VtxoTreeSpec,
1160	vtxo_cosign_sigs: &[schnorr::Signature],
1161) -> anyhow::Result<Vec<Vtxo<Full>>> {
1162	let round_txid = unsigned_round_tx.compute_txid();
1163	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1164	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1165
1166	// Validate the vtxo tree and cosign signatures.
1167	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1168		// bad server!
1169		bail!("Received incorrect vtxo cosign signatures from server");
1170	}
1171
1172	let signed_vtxos = vtxo_tree
1173		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1174		.into_cached_tree();
1175
1176	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1177	let total_nb_expected_vtxos = expected_vtxos.len();
1178
1179	let mut new_vtxos = vec![];
1180	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1181		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1182			let vtxo = signed_vtxos.build_vtxo(idx);
1183
1184			// validate the received vtxos
1185			// This is more like a sanity check since we crafted them ourselves.
1186			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1187				.context("constructed invalid vtxo from tree")?;
1188
1189			info!("New VTXO from round: {} ({}, {})",
1190				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1191			);
1192
1193			new_vtxos.push(vtxo);
1194			expected_vtxos.swap_remove(expected_idx);
1195		}
1196	}
1197
1198	if !expected_vtxos.is_empty() {
1199		if expected_vtxos.len() == total_nb_expected_vtxos {
1200			// we must have done something wrong
1201			bail!("None of our VTXOs were present in round!");
1202		} else {
1203			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1204				expected_vtxos.len(), expected_vtxos,
1205			);
1206		}
1207	}
1208	Ok(new_vtxos)
1209}
1210
1211//TODO(stevenroose) should be made idempotent
1212async fn persist_round_success(
1213	wallet: &Wallet,
1214	participation: &RoundParticipation,
1215	movement_id: Option<MovementId>,
1216	new_vtxos: &[Vtxo<Full>],
1217	funding_tx: &Transaction,
1218) -> anyhow::Result<()> {
1219	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1220		new_vtxos.len(), movement_id,
1221	);
1222
1223	// we first try all actions that need to happen and only afterwards return errors
1224	// so that we achieve maximum success
1225
1226	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1227		.context("failed to store new VTXOs");
1228	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1229		.context("failed to mark input VTXOs as spent");
1230	let update_result = if let Some(mid) = movement_id {
1231		wallet.inner.movements.finish_movement_with_update(
1232			mid,
1233			MovementStatus::Successful,
1234			MovementUpdate::new()
1235				.produced_vtxos(new_vtxos)
1236				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1237		).await.context("failed to mark movement as finished")
1238	} else {
1239		Ok(())
1240	};
1241
1242	store_result?;
1243	spent_result?;
1244	update_result?;
1245
1246	Ok(())
1247}
1248
1249async fn persist_round_failure(
1250	wallet: &Wallet,
1251	participation: &RoundParticipation,
1252	movement_id: Option<MovementId>,
1253) -> anyhow::Result<()> {
1254	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1255	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1256	let finish_result = if let Some(movement_id) = movement_id {
1257		wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1258	} else {
1259		Ok(())
1260	};
1261	if let Err(e) = &finish_result {
1262		error!("Failed to mark movement as failed: {:#}", e);
1263	}
1264	match (unlock_result, finish_result) {
1265		(Ok(()), Ok(())) => Ok(()),
1266		(Err(e), _) => Err(e),
1267		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1268	}
1269}
1270
1271async fn update_funding_txid(
1272	wallet: &Wallet,
1273	movement_id: MovementId,
1274	funding_txid: Txid,
1275) -> anyhow::Result<()> {
1276	wallet.inner.movements.update_movement(
1277		movement_id,
1278		MovementUpdate::new()
1279			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1280	).await.context("Unable to update funding txid of round")
1281}
1282
1283impl Wallet {
1284	/// Load and lock a single given round state (by id), waiting for the lock.
1285	///
1286	/// Returns `Some(state)` if the round state is found and locked, `None`
1287	/// if it is not found after acquiring the lock.
1288	pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1289		let guard = self.inner.lock_manager.lock(
1290			&format!("{}.round.{}", self.fingerprint(), id),
1291			ROUND_LOCK_TIMEOUT,
1292		).await.with_context(|| format!(
1293			"timed out waiting for lock on round state {} (wallet {})",
1294			id, self.fingerprint(),
1295		))?;
1296
1297		if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1298			return Ok(Some(state.lock(guard)));
1299		}
1300
1301		Ok(None)
1302	}
1303
1304	/// Ask the server when the next round is scheduled to start
1305	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1306		let (mut srv, _) = self.require_server().await?;
1307		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1308		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1309	}
1310
1311	/// Start a new round participation
1312	///
1313	/// This function will store the state in the db and mark the VTXOs as locked.
1314	///
1315	/// ### Return
1316	///
1317	/// - By default, the returned state will be locked to prevent race conditions.
1318	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1319	pub async fn join_next_round(
1320		&self,
1321		participation: RoundParticipation,
1322		movement_kind: Option<RoundMovement>,
1323	) -> anyhow::Result<StoredRoundState> {
1324		let movement = if let Some(kind) = movement_kind {
1325			Some(self.inner.movements.new_guarded_movement_with_update(
1326				Subsystem::ROUND,
1327				kind.to_string(),
1328				OnDropStatus::Failed,
1329				participation.to_movement_update()?
1330			).await?)
1331		} else {
1332			None
1333		};
1334		let movement_id = movement.as_ref().map(|m| m.id());
1335		let input_vtxos = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1336		let state = RoundState::new_interactive(participation, movement_id);
1337
1338		self.lock_vtxos(&input_vtxos, movement_id.map(|m| m.into())).await
1339			.context("failed to lock input VTXOs")?;
1340
1341		match (async || {
1342			let id = self.inner.db.store_round_state(&state).await?;
1343			Ok(self.lock_wait_round_state(id).await?
1344				.context("failed to lock fresh round state")?)
1345		})().await {
1346			Ok(state) => {
1347				if let Some(mut m) = movement {
1348					m.stop();
1349				}
1350				Ok(state)
1351			},
1352			Err(e) => {
1353				self.unlock_vtxos(&input_vtxos).await
1354					.context("failed to unlock input VTXOs")?;
1355				if let Some(mut m) = movement {
1356					m.fail().await.context("failed to mark movement as failed")?;
1357				}
1358				Err(e)
1359			},
1360		}
1361	}
1362
1363	/// Join next round in delegated mode
1364	pub async fn join_next_round_delegated(
1365		&self,
1366		participation: RoundParticipation,
1367		movement_kind: Option<RoundMovement>,
1368	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1369		let movement = if let Some(kind) = movement_kind {
1370			Some(self.inner.movements.new_guarded_movement_with_update(
1371				Subsystem::ROUND,
1372				kind.to_string(),
1373				OnDropStatus::Failed,
1374				participation.to_movement_update()?,
1375			).await?)
1376		} else {
1377			None
1378		};
1379		let movement_id = movement.as_ref().map(|m| m.id());
1380
1381		let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1382		self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1383			.context("error locking input VTXOs")?;
1384
1385		match self.join_next_round_delegated_inner(participation, movement_id).await {
1386			Ok(state) => {
1387				if let Some(mut m) = movement {
1388					m.stop();
1389				}
1390				Ok(state)
1391			},
1392			Err(e) => {
1393				self.unlock_vtxos(&input_ids).await
1394					.context("error unlocking input VTXOs")?;
1395				if let Some(mut m) = movement {
1396					m.fail().await.context("error marking movement as failed")?;
1397				}
1398				Err(e)
1399			},
1400		}
1401	}
1402
1403	async fn join_next_round_delegated_inner(
1404		&self,
1405		participation: RoundParticipation,
1406		movement_id: Option<MovementId>,
1407	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1408		let (mut srv, _) = self.require_server().await?;
1409
1410		// Get mailbox identifier for VTXO delivery
1411		let unblinded_mailbox_id = self.mailbox_identifier();
1412
1413		// Register VTXO transaction chains with server before round participation
1414		self.register_vtxo_transactions_with_server(&participation.inputs).await
1415			.context("failed to register input vtxo transactions with server")?;
1416
1417		// Generate attestations for input vtxos
1418		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1419		for vtxo in participation.inputs.iter() {
1420			let keypair = self.get_vtxo_key(vtxo).await
1421				.context("failed to get vtxo keypair")?;
1422			input_vtxos.push(protos::InputVtxo {
1423				vtxo_id: vtxo.id().to_bytes().to_vec(),
1424				attestation: {
1425					let attestation = DelegatedRoundParticipationAttestation::new(
1426						vtxo.id(), &participation.outputs, &keypair,
1427					);
1428					attestation.serialize()
1429				},
1430			});
1431		}
1432
1433		// Build proto VtxoRequests
1434		let vtxo_requests = participation.outputs.iter()
1435			.map(|req|
1436				protos::VtxoRequest {
1437					policy: req.policy.serialize(),
1438					amount: req.amount.to_sat(),
1439			})
1440			.collect::<Vec<_>>();
1441
1442		// Submit participation to server and get unlock_hash
1443		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1444			input_vtxos,
1445			vtxo_requests,
1446			unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1447		}).await.context("error submitting round participation to server")?.into_inner();
1448
1449		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1450			.context("invalid unlock hash from server")?;
1451
1452		let state = RoundState::new_delegated(participation, unlock_hash, movement_id);
1453
1454		info!("Delegated round participation submitted, it will automatically execute \
1455			when you next sync your wallet after the round happened \
1456			(and has sufficient confirmations).",
1457		);
1458
1459		let id = self.inner.db.store_round_state(&state).await?;
1460		Ok(StoredRoundState::new(id, state))
1461	}
1462
1463	/// Get all pending round states
1464	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1465		self.inner.db.get_pending_round_state_ids().await
1466	}
1467
1468	/// Get all pending round states
1469	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1470		let ids = self.inner.db.get_pending_round_state_ids().await?;
1471		let mut states = Vec::with_capacity(ids.len());
1472		for id in ids {
1473			if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1474				states.push(state);
1475			}
1476		}
1477		Ok(states)
1478	}
1479
1480	/// Balance locked in pending rounds
1481	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1482		let mut ret = Amount::ZERO;
1483		for round in self.pending_round_states().await? {
1484			ret += round.state().pending_balance();
1485		}
1486		Ok(ret)
1487	}
1488
1489	/// Returns all VTXOs that are locked in a pending round
1490	///
1491	/// This excludes all input VTXOs for which the output VTXOs have already
1492	/// been created.
1493	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1494		let mut ret = Vec::new();
1495		for round in self.pending_round_states().await? {
1496			let inputs = round.state().locked_pending_inputs();
1497			ret.reserve(inputs.len());
1498			for input in inputs {
1499				let v = self.get_vtxo_by_id(input.id()).await
1500					.context("unknown round input VTXO")?;
1501				ret.push(v);
1502			}
1503		}
1504		Ok(ret)
1505	}
1506
1507	/// Sync pending rounds that have finished but are waiting for confirmations
1508	pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1509		let states = self.pending_round_states().await?;
1510		if states.is_empty() {
1511			return Ok(HashMap::new());
1512		}
1513
1514		debug!("Syncing {} pending round states...", states.len());
1515
1516		let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1517		tokio_stream::iter(states).for_each_concurrent(10, |state| {
1518			let ret = ret.clone();
1519			async move {
1520				// not processing events here
1521				if state.state().ongoing_participation() {
1522					return;
1523				}
1524
1525				let mut state = match self.lock_wait_round_state(state.id()).await {
1526					Ok(Some(state)) => state,
1527					Ok(None) => return,
1528					Err(e) => {
1529						warn!("Error locking round state: {:#}", e);
1530						return;
1531					},
1532				};
1533
1534				let status = match state.state_mut().sync(self).await {
1535					Ok(s) => s,
1536					Err(e) => {
1537						warn!("Error syncing round: {:#}", e);
1538						return;
1539					},
1540				};
1541				trace!("Synced round #{}, status: {:?}", state.id(), status);
1542				match status {
1543					RoundStatus::Confirmed { funding_txid } => {
1544						info!("Round confirmed. Funding tx {}", funding_txid);
1545						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1546							warn!("Error removing confirmed round state from db: {:#}", e);
1547						}
1548					},
1549					RoundStatus::Unconfirmed { funding_txid } => {
1550						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1551						if let Err(e) = self.inner.db.update_round_state(&state).await {
1552							warn!("Error updating pending round state in db: {:#}", e);
1553						}
1554					},
1555					RoundStatus::Pending => {
1556						if let Err(e) = self.inner.db.update_round_state(&state).await {
1557							warn!("Error updating pending round state in db: {:#}", e);
1558						}
1559					},
1560					RoundStatus::Failed { ref error } => {
1561						error!("Round failed: {}", error);
1562						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1563							warn!("Error removing failed round state from db: {:#}", e);
1564						}
1565					},
1566					RoundStatus::Canceled => {
1567						error!("Round canceled");
1568						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1569							warn!("Error removing canceled round state from db: {:#}", e);
1570						}
1571					},
1572				}
1573				ret.lock().insert(state.id(), status);
1574			}
1575		}).await;
1576
1577		Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1578	}
1579
1580	/// Fetch last round event from server
1581	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1582		let (mut srv, _) = self.require_server().await?;
1583		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1584		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1585	}
1586
1587	async fn inner_process_event(
1588		&self,
1589		state: &mut StoredRoundState,
1590		event: Option<&RoundEvent>,
1591	) {
1592		if let Some(event) = event && state.state().ongoing_participation() {
1593			let updated = state.state_mut().process_event(self, &event).await;
1594			if updated {
1595				if let Err(e) = self.inner.db.update_round_state(&state).await {
1596					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1597				}
1598			}
1599		}
1600
1601		match state.state_mut().sync(self).await {
1602			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1603			Ok(s) if s.is_final() => {
1604				info!("Round #{} finished with result: {:?}", state.id(), s);
1605				if let Err(e) = self.inner.db.remove_round_state(&state).await {
1606					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1607				}
1608			},
1609			Ok(s) => {
1610				trace!("Round state #{} is now in state {:?}", state.id(), s);
1611				if let Err(e) = self.inner.db.update_round_state(&state).await {
1612					warn!("Error storing round state #{}: {:#}", state.id(), e);
1613				}
1614			},
1615		}
1616	}
1617
1618	/// Try to make incremental progress on all pending round states
1619	///
1620	/// If the `last_round_event` argument is not provided, it will be fetched
1621	/// from the server.
1622	pub async fn progress_pending_rounds(
1623		&self,
1624		last_round_event: Option<&RoundEvent>,
1625	) -> anyhow::Result<()> {
1626		let states = self.pending_round_states().await?;
1627		if states.is_empty() {
1628			return Ok(());
1629		}
1630
1631		info!("Processing {} rounds...", states.len());
1632
1633		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1634
1635		let has_ongoing_participation = states.iter()
1636			.any(|s| s.state().ongoing_participation());
1637		if has_ongoing_participation && last_round_event.is_none() {
1638			match self.get_last_round_event().await {
1639				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1640				Err(e) => {
1641					warn!("Error fetching round event, \
1642						failed to progress ongoing rounds: {:#}", e);
1643				},
1644			}
1645		}
1646
1647		let event = last_round_event.as_ref().map(|c| c.as_ref());
1648
1649		let futs = states.into_iter().map(async |state| {
1650			let locked = self.lock_wait_round_state(state.id()).await?;
1651			if let Some(mut locked) = locked {
1652				self.inner_process_event(&mut locked, event).await;
1653			}
1654			Ok::<_, anyhow::Error>(())
1655		});
1656
1657		futures::future::join_all(futs).await;
1658
1659		Ok(())
1660	}
1661
1662	pub async fn subscribe_round_events(&self)
1663		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1664	{
1665		let (mut srv, _) = self.require_server().await?;
1666		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1667			.into_inner().map(|m| {
1668				let m = m.context("received error on event stream")?;
1669				let e = RoundEvent::try_from(m.clone())
1670					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1671				trace!("Received round event: {}", e);
1672				Ok::<_, anyhow::Error>(e)
1673			});
1674		Ok(events)
1675	}
1676
1677	/// A blocking call that will try to perform a full round participation
1678	/// for all ongoing rounds
1679	///
1680	/// Returns only once there is no ongoing rounds anymore.
1681	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1682		let mut events = self.subscribe_round_events().await?;
1683
1684		loop {
1685			// NB: we need to load all ongoing rounds on every iteration here
1686			// because some might be finished by another call
1687			let state_ids = self.pending_round_states().await?.iter()
1688				.filter(|s| s.state().ongoing_participation())
1689				.map(|s| s.id())
1690				.collect::<Vec<_>>();
1691
1692			if state_ids.is_empty() {
1693				info!("All rounds handled");
1694				return Ok(());
1695			}
1696
1697			let event = events.next().await
1698				.context("events stream broke")?
1699				.context("error on event stream")?;
1700
1701			let futs = state_ids.into_iter().map(async |state| {
1702				let locked = self.lock_wait_round_state(state).await?;
1703				if let Some(mut locked) = locked {
1704					self.inner_process_event(&mut locked, Some(&event)).await;
1705				}
1706				Ok::<_, anyhow::Error>(())
1707			});
1708
1709			futures::future::join_all(futs).await;
1710		}
1711	}
1712
1713	/// Will cancel all pending rounds that can safely be canceled
1714	///
1715	/// All rounds that have not started yet can safely be canceled,
1716	/// as well as rounds where we have not yet signed any forfeit txs.
1717	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1718		// initial load to get all pending round states ids
1719		let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1720
1721		let futures = state_ids.into_iter().map(|state_id| {
1722			async move {
1723				// wait for lock and load again to ensure most recent state
1724				let mut state = match self.lock_wait_round_state(state_id).await {
1725					Ok(Some(s)) => s,
1726					Ok(None) => return,
1727					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1728				};
1729
1730				match state.state_mut().try_cancel(self).await {
1731					Ok(true) => {
1732						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1733							warn!("Error removing canceled round state from db: {:#}", e);
1734						}
1735					},
1736					Ok(false) => {},
1737					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1738				}
1739			}
1740		});
1741
1742		join_all(futures).await;
1743
1744		Ok(())
1745	}
1746
1747	/// Try to cancel the given round
1748	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1749		let mut state = self.lock_wait_round_state(id).await?
1750			.context("round state not found")?;
1751
1752		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1753			self.inner.db.remove_round_state(&state).await
1754				.context("error removing canceled round state from db")?;
1755		} else {
1756			bail!("failed to cancel round");
1757		}
1758
1759		Ok(())
1760	}
1761
1762	/// Participate in a round
1763	///
1764	/// This function will start a new round participation and block until
1765	/// the round is finished.
1766	/// After this method returns the round state will be kept active until
1767	/// the round tx fully confirms.
1768	pub(crate) async fn participate_round(
1769		&self,
1770		participation: RoundParticipation,
1771		movement_kind: Option<RoundMovement>,
1772	) -> anyhow::Result<RoundStatus> {
1773		let mut state = self.join_next_round(participation, movement_kind).await?;
1774
1775		info!("Waiting for a round start...");
1776		let mut events = self.subscribe_round_events().await?;
1777
1778		loop {
1779			if !state.state().ongoing_participation() {
1780				let status = state.state_mut().sync(self).await?;
1781				match status {
1782					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1783					RoundStatus::Canceled => bail!("round canceled"),
1784					status => return Ok(status),
1785				}
1786			}
1787
1788			let event = events.next().await
1789				.context("events stream broke")?
1790				.context("error on event stream")?;
1791			if state.state_mut().process_event(self, &event).await {
1792				self.inner.db.update_round_state(&state).await?;
1793			}
1794		}
1795	}
1796}