Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5use std::iter;
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use anyhow::Context;
11use ark::vtxo::VtxoValidationError;
12use bdk_esplora::esplora_client::Amount;
13use bip39::rand;
14use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
15use bitcoin::consensus::encode::{deserialize, serialize_hex};
16use bitcoin::hashes::Hash;
17use bitcoin::hex::DisplayHex;
18use bitcoin::key::Keypair;
19use bitcoin::secp256k1::schnorr;
20use futures::future::{join_all, try_join_all};
21use futures::{Stream, StreamExt};
22use log::{debug, error, info, trace, warn};
23
24use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
25use ark::vtxo::Full;
26use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
27use ark::forfeit::HashLockedForfeitBundle;
28use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
29use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
30use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
31use bitcoin_ext::TxStatus;
32use server_rpc::{protos, ServerConnection, TryFromBytes};
33
34use crate::{SECP, Wallet, WalletVtxo};
35use crate::movement::{MovementId, MovementStatus};
36use crate::movement::update::MovementUpdate;
37use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
38
39/// How long [`Wallet::lock_wait_round_state`] waits for a contended
40/// round lock before giving up. Long enough to outlast a normal round.
41const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
42use crate::subsystem::{RoundMovement, Subsystem};
43
44
45/// The type string for the hArk leaf transition
46const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
47
48/// Struct to communicate your specific participation for an Ark round.
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct RoundParticipation {
51	#[serde(with = "ark::encode::serde::vec")]
52	pub inputs: Vec<Vtxo<Full>>,
53	/// The output VTXOs that we request in the round,
54	/// including change
55	pub outputs: Vec<VtxoRequest>,
56	/// Optional mailbox identifier for round completion notification
57	#[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
58	pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
59}
60
61impl RoundParticipation {
62	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
63		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
64		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
65		let fee = input_amount - output_amount;
66		Ok(MovementUpdate::new()
67			.consumed_vtxos(&self.inputs)
68			.intended_balance(SignedAmount::ZERO)
69			.effective_balance( - fee.to_signed()?)
70			.fee(fee)
71		)
72	}
73}
74
75#[derive(Debug, Clone)]
76pub enum RoundStatus {
77	/// The round was successful and is fully confirmed
78	Confirmed {
79		funding_txid: Txid,
80	},
81	/// Round successful but not fully confirmed
82	Unconfirmed {
83		funding_txid: Txid,
84	},
85	/// Round didn't finish yet
86	Pending,
87	/// The round failed
88	Failed {
89		error: String,
90	},
91	/// User canceled the round
92	Canceled,
93}
94
95impl RoundStatus {
96	/// Whether this is the final state and it won't change anymore
97	pub fn is_final(&self) -> bool {
98		match self {
99			Self::Confirmed { .. } => true,
100			Self::Unconfirmed { .. } => false,
101			Self::Pending => false,
102			Self::Failed { .. } => true,
103			Self::Canceled => true,
104		}
105	}
106
107	/// Whether it looks like the round succeeded
108	pub fn is_success(&self) -> bool {
109		match self {
110			Self::Confirmed { .. } => true,
111			Self::Unconfirmed { .. } => true,
112			Self::Pending => false,
113			Self::Failed { .. } => false,
114			Self::Canceled => false,
115		}
116	}
117}
118
119/// State of the progress of a round participation
120///
121/// An instance of this struct is kept all the way from the intention of joining
122/// the next round, until either the round fully confirms or it fails and we are
123/// sure it won't have any effect on our wallet.
124///
125/// As soon as we have signed forfeit txs for the round, we keep track of this
126/// round attempt until we see another attempt we participated in confirm or
127/// we gain confidence that the failed attempt will never confirm.
128//
129//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
130// to have better control. this way we can touch db before we sent forfeit sigs
131pub struct RoundState {
132	/// Round is fully done
133	pub(crate) done: bool,
134
135	/// Our participation in this round
136	pub(crate) participation: RoundParticipation,
137
138	/// The flow of the round in case it is still ongoing with the server
139	pub(crate) flow: RoundFlowState,
140
141	/// The new output vtxos of this round participation
142	///
143	/// After we finish the interactive part, we fill this with the uncompleted
144	/// VTXOs which we then try to complete with the unlock preimage.
145	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
146
147	/// Whether we sent our forfeit signatures to the server
148	///
149	/// If we did this and the server refused to reveal our new VTXOs,
150	/// we will be forced to exit.
151	//TODO(stevenroose) implement exit when this is true and we can't make progress
152	// probably based on the input vtxos becoming close to expiry
153	pub(crate) sent_forfeit_sigs: bool,
154
155	/// The ID of the [Movement] associated with this round
156	pub(crate) movement_id: Option<MovementId>,
157}
158
159impl RoundState {
160	fn new_interactive(
161		participation: RoundParticipation,
162		movement_id: Option<MovementId>,
163	) -> Self {
164		Self {
165			participation,
166			movement_id,
167			flow: RoundFlowState::InteractivePending,
168			new_vtxos: Vec::new(),
169			sent_forfeit_sigs: false,
170			done: false,
171		}
172	}
173
174	fn new_non_interactive(
175		participation: RoundParticipation,
176		unlock_hash: UnlockHash,
177		movement_id: Option<MovementId>,
178	) -> Self {
179		Self {
180			participation,
181			movement_id,
182			flow: RoundFlowState::NonInteractivePending { unlock_hash },
183			new_vtxos: Vec::new(),
184			sent_forfeit_sigs: false,
185			done: false,
186		}
187	}
188
189	/// Our participation in this round
190	pub fn participation(&self) -> &RoundParticipation {
191		&self.participation
192	}
193
194	/// the unlock hash if already known
195	pub fn unlock_hash(&self) -> Option<UnlockHash> {
196		match self.flow {
197			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
198			RoundFlowState::InteractivePending => None,
199			RoundFlowState::InteractiveOngoing { .. } => None,
200			RoundFlowState::Failed { .. } => None,
201			RoundFlowState::Canceled => None,
202			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
203		}
204	}
205
206	pub fn funding_tx(&self) -> Option<&Transaction> {
207		match self.flow {
208			RoundFlowState::NonInteractivePending { .. } => None,
209			RoundFlowState::InteractivePending => None,
210			RoundFlowState::InteractiveOngoing { .. } => None,
211			RoundFlowState::Failed { .. } => None,
212			RoundFlowState::Canceled => None,
213			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
214		}
215	}
216
217	/// Whether the interactive part of the round is still ongoing
218	pub fn ongoing_participation(&self) -> bool {
219		match self.flow {
220			RoundFlowState::NonInteractivePending { .. } => false,
221			RoundFlowState::InteractivePending => true,
222			RoundFlowState::InteractiveOngoing { .. } => true,
223			RoundFlowState::Failed { .. } => false,
224			RoundFlowState::Canceled => false,
225			RoundFlowState::Finished { .. } => false,
226		}
227	}
228
229	/// Tries to cancel the round and returns whether it was succesfully canceled
230	/// or if it was already canceled or failed
231	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
232		let ret = match self.flow {
233			RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
234			RoundFlowState::Canceled => true,
235			RoundFlowState::Failed { .. } => true,
236			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
237				self.flow = RoundFlowState::Canceled;
238				true
239			},
240			RoundFlowState::Finished { .. } => false,
241		};
242		if ret {
243			persist_round_failure(wallet, &self.participation, self.movement_id).await
244				.context("failed to persist round failure for cancelation")?;
245		}
246		Ok(ret)
247	}
248
249	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
250		match start_attempt(wallet, &self.participation, attempt).await {
251			Ok(state) => {
252				self.flow = RoundFlowState::InteractiveOngoing {
253					round_seq: attempt.round_seq,
254					attempt_seq: attempt.attempt_seq,
255					state: state,
256				};
257			},
258			Err(e) => {
259				self.flow = RoundFlowState::Failed {
260					error: format!("{:#}", e),
261				};
262			},
263		}
264	}
265
266	/// Processes the given event and returns true if some update was made to the state
267	pub async fn process_event(
268		&mut self,
269		wallet: &Wallet,
270		event: &RoundEvent,
271	) -> bool {
272		let _: Infallible = match self.flow {
273			RoundFlowState::InteractivePending => {
274				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
275					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
276					self.try_start_attempt(wallet, e).await;
277					return true;
278				} else {
279					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
280						event.kind(), event.round_seq(), event.attempt_seq(),
281					);
282					return false;
283				}
284			},
285			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
286				// here we catch the cases where we're in a wrong flow
287
288				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
289					warn!("Round {} failed by server", round_seq);
290					self.flow = RoundFlowState::Failed {
291						error: format!("round {} failed by server", round_seq),
292					};
293					return true;
294				}
295
296				if event.round_seq() > round_seq {
297					// new round started, we don't support multiple parallel rounds,
298					// this means we failed
299					self.flow = RoundFlowState::Failed {
300						error: format!("round {} started while we were on {}",
301							event.round_seq(), round_seq,
302						),
303					};
304					return true;
305				}
306
307				if event.attempt_seq() < attempt_seq {
308					trace!("ignoring replayed message from old attempt");
309					return false;
310				}
311
312				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
313					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
314					self.try_start_attempt(wallet, e).await;
315					return true;
316				}
317				trace!("Processing event {} for round attempt {}:{} in state {}",
318					event.kind(), round_seq, attempt_seq, state.kind(),
319				);
320
321				return match progress_attempt(state, wallet, &self.participation, event).await {
322					AttemptProgressResult::NotUpdated => false,
323					AttemptProgressResult::Updated { new_state } => {
324						*state = new_state;
325						true
326					},
327					AttemptProgressResult::Failed(e) => {
328						warn!("Round failed with error: {:#}", e);
329						self.flow = RoundFlowState::Failed {
330							error: format!("{:#}", e),
331						};
332						true
333					},
334					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
335						self.new_vtxos = vtxos;
336						let funding_txid = funding_tx.compute_txid();
337						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
338						if let Some(mid) = self.movement_id {
339							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
340								warn!("Error updating the round funding txid: {:#}", e);
341							}
342						}
343						true
344					},
345				};
346			},
347			RoundFlowState::NonInteractivePending { .. }
348				| RoundFlowState::Finished { .. }
349				| RoundFlowState::Failed { .. }
350				| RoundFlowState::Canceled => return false,
351		};
352	}
353
354	/// Sync the round's status and return it
355	///
356	/// When success or failure is returned, the round state can be eliminated
357	//TODO(stevenroose) make RoundState manage its own db record
358	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
359		match self.flow {
360			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
361				Ok(RoundStatus::Confirmed {
362					funding_txid: funding_tx.compute_txid(),
363				})
364			},
365
366			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
367				Ok(RoundStatus::Pending)
368			},
369			RoundFlowState::Failed { ref error } => {
370				persist_round_failure(wallet, &self.participation, self.movement_id).await
371					.context("failed to persist round failure")?;
372				Ok(RoundStatus::Failed { error: error.clone() })
373			},
374			RoundFlowState::Canceled => {
375				persist_round_failure(wallet, &self.participation, self.movement_id).await
376					.context("failed to persist round failure")?;
377				Ok(RoundStatus::Canceled)
378			},
379
380			RoundFlowState::NonInteractivePending { unlock_hash } => {
381				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
382					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
383					Ok(HarkProgressResult::RoundNotFound) => {
384						self.handle_round_not_found(wallet).await
385					},
386					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
387						let funding_txid = funding_tx.compute_txid();
388						self.new_vtxos = new_vtxos;
389						self.flow = RoundFlowState::Finished {
390							funding_tx: funding_tx.clone(),
391							unlock_hash: unlock_hash,
392						};
393
394						persist_round_success(
395							wallet,
396							&self.participation,
397							self.movement_id,
398							&self.new_vtxos,
399							&funding_tx,
400						).await.context("failed to store successful round in DB!")?;
401
402						self.done = true;
403
404						Ok(RoundStatus::Confirmed { funding_txid })
405					},
406					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
407						if let Some(mid) = self.movement_id {
408							update_funding_txid(wallet, mid, funding_txid).await
409								.context("failed to update funding txid in DB")?;
410						}
411						Ok(RoundStatus::Unconfirmed { funding_txid })
412					},
413
414					//TODO(stevenroose) should we mark as failed for these cases?
415
416					Err(HarkForfeitError::Err(e)) => {
417						//TODO(stevenroose) we failed here but we might actualy be able to
418						// succeed if we retry. should we implement some kind of limited
419						// retry after which we mark as failed?
420						Err(e.context("error progressing non-interactive round"))
421					},
422					Err(HarkForfeitError::SentForfeits(e)) => {
423						self.sent_forfeit_sigs = true;
424						Err(e.context("error progressing non-interactive round \
425							after sending forfeit tx signatures"))
426					},
427				}
428			},
429			// interactive part finished, but didn't forfeit yet
430			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
431				let funding_txid = funding_tx.compute_txid();
432				let confirmed = check_funding_tx_confirmations(
433					wallet, funding_txid, &funding_tx,
434				).await.context("error checking funding tx confirmations")?;
435				if !confirmed {
436					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
437					return Ok(RoundStatus::Unconfirmed { funding_txid });
438				}
439
440				match hark_vtxo_swap(
441					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
442				).await {
443					Ok(()) => {
444						persist_round_success(
445							wallet,
446							&self.participation,
447							self.movement_id,
448							&self.new_vtxos,
449							&funding_tx,
450						).await.context("failed to store successful round in DB!")?;
451
452						self.done = true;
453
454						Ok(RoundStatus::Confirmed { funding_txid })
455					},
456					Err(HarkForfeitError::Err(e)) => {
457						Err(e.context("error forfeiting VTXOs after round"))
458					},
459					Err(HarkForfeitError::SentForfeits(e)) => {
460						self.sent_forfeit_sigs = true;
461						Err(e.context("error after having signed and sent \
462							forfeit signatures to server"))
463					},
464				}
465			},
466		}
467	}
468
469	/// Once we know the signed round funding tx, this returns the output VTXOs
470	/// for this round.
471	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
472		if self.new_vtxos.is_empty() {
473			None
474		} else {
475			Some(&self.new_vtxos)
476		}
477	}
478
479	/// Returns the input VTXOs that are locked in this round, but only
480	/// if no output VTXOs were issued yet.
481	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
482		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
483		match self.flow {
484			RoundFlowState::NonInteractivePending { .. }
485				| RoundFlowState::InteractivePending
486				| RoundFlowState::InteractiveOngoing { .. }
487			=> {
488				&self.participation.inputs
489			},
490			RoundFlowState::Finished { .. } => if self.done {
491				// inputs already unlocked
492				&[]
493			} else {
494				&self.participation.inputs
495			},
496			RoundFlowState::Failed { .. }
497				| RoundFlowState::Canceled
498			=> {
499				// inputs already unlocked
500				&[]
501			},
502		}
503	}
504
505	/// The balance pending in this round
506	///
507	/// This becomes zero once the new round VTXOs are unlocked.
508	pub fn pending_balance(&self) -> Amount {
509		if self.done {
510			return Amount::ZERO;
511		}
512
513		match self.flow {
514			RoundFlowState::NonInteractivePending { .. }
515				| RoundFlowState::InteractivePending
516				| RoundFlowState::InteractiveOngoing { .. }
517				| RoundFlowState::Finished { .. }
518			=> {
519				self.participation.outputs.iter().map(|o| o.amount).sum()
520			},
521			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
522				Amount::ZERO
523			},
524		}
525	}
526
527	/// Handle the case where the server reports our round participation as not found.
528	///
529	/// If we sent forfeit signatures (which only happens after the round was
530	/// confirmed), this is adversarial — trigger unilateral exit.
531	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
532	/// and verify with the server that they're still considered spendable.
533	async fn handle_round_not_found(
534		&mut self,
535		wallet: &Wallet,
536	) -> anyhow::Result<RoundStatus> {
537		info!("Server reports round participation not found (no forfeits sent)");
538		self.flow = RoundFlowState::Failed {
539			error: "server reports round participation not found".into(),
540		};
541		persist_round_failure(wallet, &self.participation, self.movement_id).await
542			.context("failed to persist round failure")?;
543
544		Ok(RoundStatus::Failed {
545			error: "server reports round participation not found".into(),
546		})
547	}
548}
549
550/// The state of the process flow of a round
551///
552/// This tracks the progress of the interactive part of the round, from
553/// waiting to start until finishing either succesfully or with a failure.
554pub enum RoundFlowState {
555	/// We don't do flow and we just wait for the round to finish
556	NonInteractivePending {
557		unlock_hash: UnlockHash,
558	},
559
560	/// Waiting for round to happen
561	InteractivePending,
562	/// Interactive part ongoing
563	InteractiveOngoing {
564		round_seq: RoundSeq,
565		attempt_seq: usize,
566		state: AttemptState,
567	},
568
569	/// Interactive part finished, waiting for confirmation
570	Finished {
571		funding_tx: Transaction,
572		unlock_hash: UnlockHash,
573	},
574
575	/// Failed during round
576	Failed {
577		error: String,
578	},
579
580	/// User canceled round
581	Canceled,
582}
583
584/// The state of a single round attempt
585///
586/// For each attempt that we participate in, we keep the state of our concrete
587/// participation.
588pub enum AttemptState {
589	AwaitingAttempt,
590	AwaitingUnsignedVtxoTree {
591		cosign_keys: Vec<Keypair>,
592		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
593		unlock_hash: UnlockHash,
594	},
595	AwaitingFinishedRound {
596		unsigned_round_tx: Transaction,
597		vtxos_spec: VtxoTreeSpec,
598		unlock_hash: UnlockHash,
599	},
600}
601
602impl AttemptState {
603	/// The state kind represented as a string
604	fn kind(&self) -> &'static str {
605		match self {
606			Self::AwaitingAttempt => "AwaitingAttempt",
607			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
608			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
609		}
610	}
611}
612
613/// Result from trying to progress an ongoing round attempt
614enum AttemptProgressResult {
615	Finished {
616		funding_tx: Transaction,
617		vtxos: Vec<Vtxo<Full>>,
618		unlock_hash: UnlockHash,
619	},
620	Failed(anyhow::Error),
621	/// When the state changes, this variant is returned
622	///
623	/// If during the processing, we have signed any forfeit txs and tried
624	/// sending them to the server, the [UnconfirmedRound] instance is returned
625	/// so that it can be stored in the state.
626	Updated {
627		new_state: AttemptState,
628	},
629	NotUpdated,
630}
631
632/// Participate in the new round attempt by submitting our round participation
633async fn start_attempt(
634	wallet: &Wallet,
635	participation: &RoundParticipation,
636	event: &RoundAttempt,
637) -> anyhow::Result<AttemptState> {
638	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
639
640	// Assign cosign pubkeys to the payment requests.
641	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
642		.take(participation.outputs.len())
643		.collect::<Vec<_>>();
644
645	// Prepare round participation info.
646	// For each of our requested vtxo output, we need a set of public and secret nonces.
647	let cosign_nonces = cosign_keys.iter()
648		.map(|key| {
649			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
650			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
651			for _ in 0..ark_info.nb_round_nonces {
652				let (s, p) = musig::nonce_pair(key);
653				secs.push(s);
654				pubs.push(p);
655			}
656			(secs, pubs)
657		})
658		.take(participation.outputs.len())
659		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
660
661
662	// The round has now started. We can submit our payment.
663	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
664		participation.inputs.len(), participation.outputs.len(),
665	);
666
667	// Build signed requests with mailbox IDs
668	let unblinded_mailbox_id = wallet.mailbox_identifier();
669	let signed_reqs = participation.outputs.iter()
670		.zip(cosign_keys.iter())
671		.zip(cosign_nonces.iter())
672		.map(|((req, cosign_key), (_sec, pub_nonces))| {
673			SignedVtxoRequest {
674				vtxo: req.clone(),
675				cosign_pubkey: cosign_key.public_key(),
676				nonces: pub_nonces.clone(),
677			}
678		})
679		.collect::<Vec<_>>();
680
681	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
682	for vtxo in participation.inputs.iter() {
683		let keypair = wallet.get_vtxo_key(vtxo).await
684			.map_err(HarkForfeitError::Err)?;
685		input_vtxos.push(protos::InputVtxo {
686			vtxo_id: vtxo.id().to_bytes().to_vec(),
687			attestation: {
688				let attestation = RoundAttemptAttestation::new(
689					event.challenge, vtxo.id(), &signed_reqs, &keypair,
690				);
691				attestation.serialize()
692			},
693		});
694	}
695
696	// Register VTXO transaction chains with server before round participation
697	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
698		.map_err(HarkForfeitError::Err)?;
699
700	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
701		input_vtxos: input_vtxos,
702		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
703		#[allow(deprecated)]
704		offboard_requests: vec![],
705		unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
706	}).await.context("Ark server refused our payment submission")?;
707
708	Ok(AttemptState::AwaitingUnsignedVtxoTree {
709		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
710		cosign_keys: cosign_keys,
711		secret_nonces: cosign_nonces.into_iter()
712			.map(|(sec, _pub)| sec.into_iter()
713				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
714				.collect())
715			.collect(),
716	})
717}
718
719/// just an internal type; need Error trait to work with anyhow
720#[derive(Debug, thiserror::Error)]
721enum HarkForfeitError {
722	/// An error happened after we sent forfeit signatures to the server
723	#[error("error after forfeits were sent")]
724	SentForfeits(#[source] anyhow::Error),
725	/// An error happened before we sent forfeit signatures to the server
726	#[error("error before forfeits were sent")]
727	Err(#[source] anyhow::Error),
728}
729
730async fn hark_cosign_leaf(
731	wallet: &Wallet,
732	srv: &mut ServerConnection,
733	funding_tx: &Transaction,
734	vtxo: &mut Vtxo<Full>,
735) -> anyhow::Result<()> {
736	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
737		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
738		.with_context(|| format!(
739			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
740		))?.1;
741	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
742	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
743		protos::LeafVtxoCosignRequest::from(cosign_req),
744	).await
745		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
746		.into_inner().try_into()
747		.context("bad leaf vtxo cosign response")?;
748	ensure!(ctx.finalize(vtxo, cosign_resp),
749		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
750	);
751
752	Ok(())
753}
754
755/// Finish the hArk VTXO swap protocol
756///
757/// This includes:
758/// - requesting cosignature of the locked hArk leaves
759/// - sending forfeit txs to the server in return for the unlock preimage
760///
761/// NB all the actions in this function are idempotent, meaning that the server
762/// allows them to be done multiple times. this means that if this function calls
763/// fails, it's safe to just call it again at a later time
764async fn hark_vtxo_swap(
765	wallet: &Wallet,
766	participation: &RoundParticipation,
767	output_vtxos: &mut [Vtxo<Full>],
768	funding_tx: &Transaction,
769	unlock_hash: UnlockHash,
770) -> Result<(), HarkForfeitError> {
771	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
772
773	// before we start make sure the server has our input vtxo signatures
774	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
775		.context("couldn't send our input vtxo transactions to server")
776		.map_err(HarkForfeitError::Err)?;
777
778	// first get the leaves signed
779	for vtxo in output_vtxos.iter_mut() {
780		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
781			.map_err(HarkForfeitError::Err)?;
782	}
783
784	// then do the forfeit dance
785
786	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
787		unlock_hash: unlock_hash.to_byte_array().to_vec(),
788		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
789	}).await
790		.context("request forfeits nonces call failed")
791		.map_err(HarkForfeitError::Err)?
792		.into_inner().public_nonces.into_iter()
793		.map(|b| musig::PublicNonce::from_bytes(b))
794		.collect::<Result<Vec<_>, _>>()
795		.context("invalid forfeit nonces")
796		.map_err(HarkForfeitError::Err)?;
797
798	if server_nonces.len() != participation.inputs.len() {
799		return Err(HarkForfeitError::Err(anyhow!(
800			"server sent {} nonce pairs, expected {}",
801			server_nonces.len(), participation.inputs.len(),
802		)));
803	}
804
805	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
806	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
807		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
808			.ok().flatten().with_context(|| format!(
809				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
810			)).map_err(HarkForfeitError::Err)?.1;
811		forfeit_bundles.push(HashLockedForfeitBundle::new(
812			input, unlock_hash, &user_key, &nonces,
813		))
814	}
815
816	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
817		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
818	}).await
819		.context("forfeit vtxos call failed")
820		.map_err(HarkForfeitError::SentForfeits)?
821		.into_inner().unlock_preimage.as_slice().try_into()
822		.context("invalid preimage length")
823		.map_err(HarkForfeitError::SentForfeits)?;
824
825	for vtxo in output_vtxos.iter_mut() {
826		if !vtxo.provide_unlock_preimage(preimage) {
827			return Err(HarkForfeitError::SentForfeits(anyhow!(
828				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
829				preimage.as_hex(), vtxo.id(), unlock_hash,
830			)));
831		}
832
833		// then validate the vtxo works
834		vtxo.validate(&funding_tx).with_context(|| format!(
835			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
836		)).map_err(HarkForfeitError::SentForfeits)?;
837	}
838
839	Ok(())
840}
841
842fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
843	match vtxo.validate(funding_tx) {
844		Err(VtxoValidationError::GenesisTransition {
845			genesis_idx, genesis_len, transition_kind, ..
846		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
847		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
848			vtxo.serialize_hex(),
849		)),
850		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
851	}
852}
853
854fn check_round_matches_participation(
855	part: &RoundParticipation,
856	new_vtxos: &[Vtxo<Full>],
857	funding_tx: &Transaction,
858) -> anyhow::Result<()> {
859	ensure!(new_vtxos.len() == part.outputs.len(),
860		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
861	);
862
863	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
864		ensure!(vtxo.amount() == req.amount,
865			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
866		);
867		ensure!(*vtxo.policy() == req.policy,
868			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
869		);
870
871		// We accept the VTXO if only the hArk transition (last) failure happens
872		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
873	}
874
875	Ok(())
876}
877
878/// Check the confirmation status of a funding tx
879///
880/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
881/// The required number of confirmations depends on the wallet's configuration.
882///
883/// Returns false if the funding tx seems valid but not confirmed yet.
884///
885/// Returns an error if the chain source fails or if we can't submit the tx to the
886/// mempool, suggesting it might be double spent.
887async fn check_funding_tx_confirmations(
888	wallet: &Wallet,
889	funding_txid: Txid,
890	funding_tx: &Transaction,
891) -> anyhow::Result<bool> {
892	let tip = wallet.inner.chain.tip().await.context("chain source error")?;
893	let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
894	let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
895	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
896		funding_txid, tx_status, tip,
897	);
898	match tx_status {
899		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
900		TxStatus::Mempool | TxStatus::Confirmed(_) => {
901			if wallet.inner.config.round_tx_required_confirmations == 0 {
902				debug!("Accepting round funding tx without confirmations because of configuration");
903				Ok(true)
904			} else {
905				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
906				Ok(false)
907			}
908		},
909		TxStatus::NotFound => {
910			// let's try to submit it to our mempool
911			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
912			// reliably distinguish the cases of our chain source having issues and the tx
913			// actually being rejected which suggests the round was double-spent
914			if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
915				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
916					funding_txid, serialize_hex(funding_tx), e,
917				))
918			} else {
919				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
920				Ok(false)
921			}
922		},
923	}
924}
925
926enum HarkProgressResult {
927	RoundPending,
928	RoundNotFound,
929	FundingTxUnconfirmed {
930		funding_txid: Txid,
931	},
932	Ok {
933		funding_tx: Transaction,
934		new_vtxos: Vec<Vtxo<Full>>,
935	},
936}
937
938async fn progress_non_interactive(
939	wallet: &Wallet,
940	participation: &RoundParticipation,
941	unlock_hash: UnlockHash,
942) -> Result<HarkProgressResult, HarkForfeitError> {
943	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
944
945	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
946		unlock_hash: unlock_hash.to_byte_array().to_vec(),
947	}).await {
948		Ok(resp) => resp.into_inner(),
949		Err(err) if err.code() == tonic::Code::NotFound => {
950			return Ok(HarkProgressResult::RoundNotFound);
951		},
952		Err(err) => {
953			return Err(HarkForfeitError::Err(
954				anyhow::Error::from(err).context("error checking round participation status"),
955			));
956		},
957	};
958	let status = protos::RoundParticipationStatus::try_from(resp.status)
959		.context("unknown status from server")
960		.map_err(HarkForfeitError::Err)	?;
961
962	if status == protos::RoundParticipationStatus::RoundPartPending {
963		trace!("Hark round still pending");
964		return Ok(HarkProgressResult::RoundPending);
965	}
966
967	// Since we got here, we clearly don't think we're finished.
968	// So even if the server thinks we did the dance before, we need the
969	// cosignature on the leaf tx so we need to do the dance again.
970	// "Guilty feet have got no rhythm."
971	if status == protos::RoundParticipationStatus::RoundPartReleased {
972		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
973		warn!("Server says preimage was already released for hArk participation \
974			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
975		);
976	}
977
978	let funding_tx_bytes = resp.round_funding_tx
979		.context("funding txid should be provided when status is not pending")
980		.map_err(HarkForfeitError::Err)?;
981	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
982		.context("invalid funding txid")
983		.map_err(HarkForfeitError::Err)?;
984	let funding_txid = funding_tx.compute_txid();
985	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
986		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
987	);
988
989	// Check the confirmation status of the funding tx
990	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
991		Ok(true) => {},
992		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
993		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
994	}
995
996	let mut new_vtxos = resp.output_vtxos.into_iter()
997		.map(|v| <Vtxo<Full>>::deserialize(&v))
998		.collect::<Result<Vec<_>, _>>()
999		.context("invalid output VTXOs from server")
1000		.map_err(HarkForfeitError::Err)?;
1001
1002	// Check that the vtxos match our participation in the exact order
1003	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1004		.context("new VTXOs received from server don't match our participation")
1005		.map_err(HarkForfeitError::Err)?;
1006
1007	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1008		.context("error forfeiting hArk VTXOs")
1009		.map_err(HarkForfeitError::SentForfeits)?;
1010
1011	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1012}
1013
1014async fn progress_attempt(
1015	state: &AttemptState,
1016	wallet: &Wallet,
1017	part: &RoundParticipation,
1018	event: &RoundEvent,
1019) -> AttemptProgressResult {
1020	// we will match only the states and messages required to make progress,
1021	// all else we ignore, except an unexpected finish
1022
1023	match (state, event) {
1024
1025		(
1026			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1027			RoundEvent::VtxoProposal(e),
1028		) => {
1029			trace!("Received VtxoProposal: {:#?}", e);
1030			match sign_vtxo_tree(
1031				wallet,
1032				part,
1033				&cosign_keys,
1034				&secret_nonces,
1035				&e.unsigned_round_tx,
1036				&e.vtxos_spec,
1037				&e.cosign_agg_nonces,
1038			).await {
1039				Ok(()) => {
1040					AttemptProgressResult::Updated {
1041						new_state: AttemptState::AwaitingFinishedRound {
1042							unsigned_round_tx: e.unsigned_round_tx.clone(),
1043							vtxos_spec: e.vtxos_spec.clone(),
1044							unlock_hash: *unlock_hash,
1045						},
1046					}
1047				},
1048				Err(e) => {
1049					trace!("Error signing VTXO tree: {:#}", e);
1050					AttemptProgressResult::Failed(e)
1051				},
1052			}
1053		},
1054
1055		(
1056			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1057			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1058		) => {
1059			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1060				return AttemptProgressResult::Failed(anyhow!(
1061					"signed funding tx ({}) doesn't match tx received before ({})",
1062					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1063				));
1064			}
1065
1066			if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1067				warn!("Failed to broadcast signed round tx: {:#}", e);
1068			}
1069
1070			match construct_new_vtxos(
1071				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1072			).await {
1073				Ok(v) => AttemptProgressResult::Finished {
1074					funding_tx: signed_round_tx.clone(),
1075					vtxos: v,
1076					unlock_hash: *unlock_hash,
1077				},
1078				Err(e) => AttemptProgressResult::Failed(anyhow!(
1079					"failed to construct new VTXOs for round: {:#}", e,
1080				)),
1081			}
1082		},
1083
1084		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1085			AttemptProgressResult::Failed(anyhow!(
1086				"unexpectedly received a finished round while we were in state {}",
1087				state.kind(),
1088			))
1089		},
1090
1091		(state, _) => {
1092			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1093			AttemptProgressResult::NotUpdated
1094		},
1095	}
1096}
1097
1098async fn sign_vtxo_tree(
1099	wallet: &Wallet,
1100	participation: &RoundParticipation,
1101	cosign_keys: &[Keypair],
1102	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1103	unsigned_round_tx: &Transaction,
1104	vtxo_tree: &VtxoTreeSpec,
1105	cosign_agg_nonces: &[musig::AggregatedNonce],
1106) -> anyhow::Result<()> {
1107	let (srv, _) = wallet.require_server().await.context("server not available")?;
1108
1109	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1110
1111	// Check that the proposal contains our inputs.
1112	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1113	for vtxo_req in vtxo_tree.iter_vtxos() {
1114		if let Some(i) = my_vtxos.iter().position(|v| {
1115			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1116		}) {
1117			my_vtxos.swap_remove(i);
1118		}
1119	}
1120	if !my_vtxos.is_empty() {
1121		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1122	}
1123
1124	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1125	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1126	trace!("Sending vtxo signatures to server...");
1127	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1128		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1129		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1130		let part_sigs = unsigned_vtxos.cosign_branch(
1131			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1132		).context("failed to cosign branch: our request not part of tree")?;
1133
1134		info!("Sending {} partial vtxo cosign signatures for pk {}",
1135			part_sigs.len(), key.public_key(),
1136		);
1137
1138		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1139			pubkey: key.public_key().serialize().to_vec(),
1140			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1141		}).await.context("error sending vtxo signatures")?;
1142
1143		Result::<(), anyhow::Error>::Ok(())
1144	})).await.context("error sending VTXO signatures")?;
1145	trace!("Done sending vtxo signatures to server");
1146
1147	Ok(())
1148}
1149
1150async fn construct_new_vtxos(
1151	participation: &RoundParticipation,
1152	unsigned_round_tx: &Transaction,
1153	vtxo_tree: &VtxoTreeSpec,
1154	vtxo_cosign_sigs: &[schnorr::Signature],
1155) -> anyhow::Result<Vec<Vtxo<Full>>> {
1156	let round_txid = unsigned_round_tx.compute_txid();
1157	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1158	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1159
1160	// Validate the vtxo tree and cosign signatures.
1161	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1162		// bad server!
1163		bail!("Received incorrect vtxo cosign signatures from server");
1164	}
1165
1166	let signed_vtxos = vtxo_tree
1167		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1168		.into_cached_tree();
1169
1170	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1171	let total_nb_expected_vtxos = expected_vtxos.len();
1172
1173	let mut new_vtxos = vec![];
1174	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1175		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1176			let vtxo = signed_vtxos.build_vtxo(idx);
1177
1178			// validate the received vtxos
1179			// This is more like a sanity check since we crafted them ourselves.
1180			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1181				.context("constructed invalid vtxo from tree")?;
1182
1183			info!("New VTXO from round: {} ({}, {})",
1184				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1185			);
1186
1187			new_vtxos.push(vtxo);
1188			expected_vtxos.swap_remove(expected_idx);
1189		}
1190	}
1191
1192	if !expected_vtxos.is_empty() {
1193		if expected_vtxos.len() == total_nb_expected_vtxos {
1194			// we must have done something wrong
1195			bail!("None of our VTXOs were present in round!");
1196		} else {
1197			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1198				expected_vtxos.len(), expected_vtxos,
1199			);
1200		}
1201	}
1202	Ok(new_vtxos)
1203}
1204
1205//TODO(stevenroose) should be made idempotent
1206async fn persist_round_success(
1207	wallet: &Wallet,
1208	participation: &RoundParticipation,
1209	movement_id: Option<MovementId>,
1210	new_vtxos: &[Vtxo<Full>],
1211	funding_tx: &Transaction,
1212) -> anyhow::Result<()> {
1213	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1214		new_vtxos.len(), movement_id,
1215	);
1216
1217	// we first try all actions that need to happen and only afterwards return errors
1218	// so that we achieve maximum success
1219
1220	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1221		.context("failed to store new VTXOs");
1222	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1223		.context("failed to mark input VTXOs as spent");
1224	let update_result = if let Some(mid) = movement_id {
1225		wallet.inner.movements.finish_movement_with_update(
1226			mid,
1227			MovementStatus::Successful,
1228			MovementUpdate::new()
1229				.produced_vtxos(new_vtxos)
1230				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1231		).await.context("failed to mark movement as finished")
1232	} else {
1233		Ok(())
1234	};
1235
1236	store_result?;
1237	spent_result?;
1238	update_result?;
1239
1240	Ok(())
1241}
1242
1243async fn persist_round_failure(
1244	wallet: &Wallet,
1245	participation: &RoundParticipation,
1246	movement_id: Option<MovementId>,
1247) -> anyhow::Result<()> {
1248	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1249	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1250	let finish_result = if let Some(movement_id) = movement_id {
1251		wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1252	} else {
1253		Ok(())
1254	};
1255	if let Err(e) = &finish_result {
1256		error!("Failed to mark movement as failed: {:#}", e);
1257	}
1258	match (unlock_result, finish_result) {
1259		(Ok(()), Ok(())) => Ok(()),
1260		(Err(e), _) => Err(e),
1261		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1262	}
1263}
1264
1265async fn update_funding_txid(
1266	wallet: &Wallet,
1267	movement_id: MovementId,
1268	funding_txid: Txid,
1269) -> anyhow::Result<()> {
1270	wallet.inner.movements.update_movement(
1271		movement_id,
1272		MovementUpdate::new()
1273			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1274	).await.context("Unable to update funding txid of round")
1275}
1276
1277impl Wallet {
1278	/// Load and lock a single given round state (by id), waiting for the lock.
1279	///
1280	/// Returns `Some(state)` if the round state is found and locked, `None`
1281	/// if it is not found after acquiring the lock.
1282	pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1283		let guard = self.inner.lock_manager.lock(
1284			&format!("{}.round.{}", self.fingerprint(), id),
1285			ROUND_LOCK_TIMEOUT,
1286		).await.with_context(|| format!(
1287			"timed out waiting for lock on round state {} (wallet {})",
1288			id, self.fingerprint(),
1289		))?;
1290
1291		if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1292			return Ok(Some(state.lock(guard)));
1293		}
1294
1295		Ok(None)
1296	}
1297
1298	/// Ask the server when the next round is scheduled to start
1299	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1300		let (mut srv, _) = self.require_server().await?;
1301		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1302		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1303	}
1304
1305	/// Start a new round participation
1306	///
1307	/// This function will store the state in the db and mark the VTXOs as locked.
1308	///
1309	/// ### Return
1310	///
1311	/// - By default, the returned state will be locked to prevent race conditions.
1312	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1313	pub async fn join_next_round(
1314		&self,
1315		participation: RoundParticipation,
1316		movement_kind: Option<RoundMovement>,
1317	) -> anyhow::Result<StoredRoundState> {
1318		let movement_id = if let Some(kind) = movement_kind {
1319			Some(self.inner.movements.new_movement_with_update(
1320				Subsystem::ROUND,
1321				kind.to_string(),
1322				participation.to_movement_update()?
1323			).await?)
1324		} else {
1325			None
1326		};
1327		let state = RoundState::new_interactive(participation, movement_id);
1328
1329		let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1330		let state = self.lock_wait_round_state(id).await?
1331			.context("failed to lock fresh round state")?;
1332
1333		Ok(state)
1334	}
1335
1336	/// Join next round in non-interactive or delegated mode
1337	pub async fn join_next_round_delegated(
1338		&self,
1339		participation: RoundParticipation,
1340		movement_kind: Option<RoundMovement>,
1341	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1342		let (mut srv, _) = self.require_server().await?;
1343
1344		let movement_id = if let Some(kind) = movement_kind {
1345			Some(self.inner.movements.new_movement_with_update(
1346				Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1347			).await?)
1348		} else {
1349			None
1350		};
1351
1352		// Get mailbox identifier for VTXO delivery
1353		let unblinded_mailbox_id = self.mailbox_identifier();
1354
1355		// Register VTXO transaction chains with server before round participation
1356		self.register_vtxo_transactions_with_server(&participation.inputs).await
1357			.context("failed to register input vtxo transactions with server")?;
1358
1359		// Generate attestations for input vtxos
1360		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1361		for vtxo in participation.inputs.iter() {
1362			let keypair = self.get_vtxo_key(vtxo).await
1363				.context("failed to get vtxo keypair")?;
1364			input_vtxos.push(protos::InputVtxo {
1365				vtxo_id: vtxo.id().to_bytes().to_vec(),
1366				attestation: {
1367					let attestation = DelegatedRoundParticipationAttestation::new(
1368						vtxo.id(), &participation.outputs, &keypair,
1369					);
1370					attestation.serialize()
1371				},
1372			});
1373		}
1374
1375		// Build proto VtxoRequests
1376		let vtxo_requests = participation.outputs.iter()
1377			.map(|req|
1378				protos::VtxoRequest {
1379					policy: req.policy.serialize(),
1380					amount: req.amount.to_sat(),
1381			})
1382			.collect::<Vec<_>>();
1383
1384		// Submit participation to server and get unlock_hash
1385		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1386			input_vtxos,
1387			vtxo_requests,
1388			unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1389		}).await.context("error submitting round participation to server")?.into_inner();
1390
1391		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1392			.context("invalid unlock hash from server")?;
1393
1394		let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1395
1396		info!("Non-interactive round participation submitted, it will automatically execute \
1397			when you next sync your wallet after the round happened \
1398			(and has sufficient confirmations).",
1399		);
1400
1401		let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1402		Ok(StoredRoundState::new(id, state))
1403	}
1404
1405	/// Get all pending round states
1406	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1407		self.inner.db.get_pending_round_state_ids().await
1408	}
1409
1410	/// Get all pending round states
1411	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1412		let ids = self.inner.db.get_pending_round_state_ids().await?;
1413		let mut states = Vec::with_capacity(ids.len());
1414		for id in ids {
1415			if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1416				states.push(state);
1417			}
1418		}
1419		Ok(states)
1420	}
1421
1422	/// Balance locked in pending rounds
1423	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1424		let mut ret = Amount::ZERO;
1425		for round in self.pending_round_states().await? {
1426			ret += round.state().pending_balance();
1427		}
1428		Ok(ret)
1429	}
1430
1431	/// Returns all VTXOs that are locked in a pending round
1432	///
1433	/// This excludes all input VTXOs for which the output VTXOs have already
1434	/// been created.
1435	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1436		let mut ret = Vec::new();
1437		for round in self.pending_round_states().await? {
1438			let inputs = round.state().locked_pending_inputs();
1439			ret.reserve(inputs.len());
1440			for input in inputs {
1441				let v = self.get_vtxo_by_id(input.id()).await
1442					.context("unknown round input VTXO")?;
1443				ret.push(v);
1444			}
1445		}
1446		Ok(ret)
1447	}
1448
1449	/// Sync pending rounds that have finished but are waiting for confirmations
1450	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1451		let states = self.pending_round_states().await?;
1452		if states.is_empty() {
1453			return Ok(());
1454		}
1455
1456		debug!("Syncing {} pending round states...", states.len());
1457
1458		tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1459			// not processing events here
1460			if state.state().ongoing_participation() {
1461				return;
1462			}
1463
1464			let mut state = match self.lock_wait_round_state(state.id()).await {
1465				Ok(Some(state)) => state,
1466				Ok(None) => return,
1467				Err(e) => {
1468					warn!("Error locking round state: {:#}", e);
1469					return;
1470				},
1471			};
1472
1473			let status = state.state_mut().sync(self).await;
1474			trace!("Synced round #{}, status: {:?}", state.id(), status);
1475			match status {
1476				Ok(RoundStatus::Confirmed { funding_txid }) => {
1477					info!("Round confirmed. Funding tx {}", funding_txid);
1478					if let Err(e) = self.inner.db.remove_round_state(&state).await {
1479						warn!("Error removing confirmed round state from db: {:#}", e);
1480					}
1481				},
1482				Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1483					info!("Waiting for confirmations for round funding tx {}", funding_txid);
1484					if let Err(e) = self.inner.db.update_round_state(&state).await {
1485						warn!("Error updating pending round state in db: {:#}", e);
1486					}
1487				},
1488				Ok(RoundStatus::Pending) => {
1489					if let Err(e) = self.inner.db.update_round_state(&state).await {
1490						warn!("Error updating pending round state in db: {:#}", e);
1491					}
1492				},
1493				Ok(RoundStatus::Failed { error }) => {
1494					error!("Round failed: {}", error);
1495					if let Err(e) = self.inner.db.remove_round_state(&state).await {
1496						warn!("Error removing failed round state from db: {:#}", e);
1497					}
1498				},
1499				Ok(RoundStatus::Canceled) => {
1500					error!("Round canceled");
1501					if let Err(e) = self.inner.db.remove_round_state(&state).await {
1502						warn!("Error removing canceled round state from db: {:#}", e);
1503					}
1504				},
1505				Err(e) => warn!("Error syncing round: {:#}", e),
1506			}
1507		}).await;
1508
1509		Ok(())
1510	}
1511
1512	/// Fetch last round event from server
1513	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1514		let (mut srv, _) = self.require_server().await?;
1515		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1516		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1517	}
1518
1519	async fn inner_process_event(
1520		&self,
1521		state: &mut StoredRoundState,
1522		event: Option<&RoundEvent>,
1523	) {
1524		if let Some(event) = event && state.state().ongoing_participation() {
1525			let updated = state.state_mut().process_event(self, &event).await;
1526			if updated {
1527				if let Err(e) = self.inner.db.update_round_state(&state).await {
1528					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1529				}
1530			}
1531		}
1532
1533		match state.state_mut().sync(self).await {
1534			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1535			Ok(s) if s.is_final() => {
1536				info!("Round #{} finished with result: {:?}", state.id(), s);
1537				if let Err(e) = self.inner.db.remove_round_state(&state).await {
1538					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1539				}
1540			},
1541			Ok(s) => {
1542				trace!("Round state #{} is now in state {:?}", state.id(), s);
1543				if let Err(e) = self.inner.db.update_round_state(&state).await {
1544					warn!("Error storing round state #{}: {:#}", state.id(), e);
1545				}
1546			},
1547		}
1548	}
1549
1550	/// Try to make incremental progress on all pending round states
1551	///
1552	/// If the `last_round_event` argument is not provided, it will be fetched
1553	/// from the server.
1554	pub async fn progress_pending_rounds(
1555		&self,
1556		last_round_event: Option<&RoundEvent>,
1557	) -> anyhow::Result<()> {
1558		let states = self.pending_round_states().await?;
1559		if states.is_empty() {
1560			return Ok(());
1561		}
1562
1563		info!("Processing {} rounds...", states.len());
1564
1565		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1566
1567		let has_ongoing_participation = states.iter()
1568			.any(|s| s.state().ongoing_participation());
1569		if has_ongoing_participation && last_round_event.is_none() {
1570			match self.get_last_round_event().await {
1571				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1572				Err(e) => {
1573					warn!("Error fetching round event, \
1574						failed to progress ongoing rounds: {:#}", e);
1575				},
1576			}
1577		}
1578
1579		let event = last_round_event.as_ref().map(|c| c.as_ref());
1580
1581		let futs = states.into_iter().map(async |state| {
1582			let locked = self.lock_wait_round_state(state.id()).await?;
1583			if let Some(mut locked) = locked {
1584				self.inner_process_event(&mut locked, event).await;
1585			}
1586			Ok::<_, anyhow::Error>(())
1587		});
1588
1589		futures::future::join_all(futs).await;
1590
1591		Ok(())
1592	}
1593
1594	pub async fn subscribe_round_events(&self)
1595		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1596	{
1597		let (mut srv, _) = self.require_server().await?;
1598		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1599			.into_inner().map(|m| {
1600				let m = m.context("received error on event stream")?;
1601				let e = RoundEvent::try_from(m.clone())
1602					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1603				trace!("Received round event: {}", e);
1604				Ok::<_, anyhow::Error>(e)
1605			});
1606		Ok(events)
1607	}
1608
1609	/// A blocking call that will try to perform a full round participation
1610	/// for all ongoing rounds
1611	///
1612	/// Returns only once there is no ongoing rounds anymore.
1613	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1614		let mut events = self.subscribe_round_events().await?;
1615
1616		loop {
1617			// NB: we need to load all ongoing rounds on every iteration here
1618			// because some might be finished by another call
1619			let state_ids = self.pending_round_states().await?.iter()
1620				.filter(|s| s.state().ongoing_participation())
1621				.map(|s| s.id())
1622				.collect::<Vec<_>>();
1623
1624			if state_ids.is_empty() {
1625				info!("All rounds handled");
1626				return Ok(());
1627			}
1628
1629			let event = events.next().await
1630				.context("events stream broke")?
1631				.context("error on event stream")?;
1632
1633			let futs = state_ids.into_iter().map(async |state| {
1634				let locked = self.lock_wait_round_state(state).await?;
1635				if let Some(mut locked) = locked {
1636					self.inner_process_event(&mut locked, Some(&event)).await;
1637				}
1638				Ok::<_, anyhow::Error>(())
1639			});
1640
1641			futures::future::join_all(futs).await;
1642		}
1643	}
1644
1645	/// Will cancel all pending rounds that can safely be canceled
1646	///
1647	/// All rounds that have not started yet can safely be canceled,
1648	/// as well as rounds where we have not yet signed any forfeit txs.
1649	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1650		// initial load to get all pending round states ids
1651		let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1652
1653		let futures = state_ids.into_iter().map(|state_id| {
1654			async move {
1655				// wait for lock and load again to ensure most recent state
1656				let mut state = match self.lock_wait_round_state(state_id).await {
1657					Ok(Some(s)) => s,
1658					Ok(None) => return,
1659					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1660				};
1661
1662				match state.state_mut().try_cancel(self).await {
1663					Ok(true) => {
1664						if let Err(e) = self.inner.db.remove_round_state(&state).await {
1665							warn!("Error removing canceled round state from db: {:#}", e);
1666						}
1667					},
1668					Ok(false) => {},
1669					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1670				}
1671			}
1672		});
1673
1674		join_all(futures).await;
1675
1676		Ok(())
1677	}
1678
1679	/// Try to cancel the given round
1680	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1681		let mut state = self.lock_wait_round_state(id).await?
1682			.context("round state not found")?;
1683
1684		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1685			self.inner.db.remove_round_state(&state).await
1686				.context("error removing canceled round state from db")?;
1687		} else {
1688			bail!("failed to cancel round");
1689		}
1690
1691		Ok(())
1692	}
1693
1694	/// Participate in a round
1695	///
1696	/// This function will start a new round participation and block until
1697	/// the round is finished.
1698	/// After this method returns the round state will be kept active until
1699	/// the round tx fully confirms.
1700	pub(crate) async fn participate_round(
1701		&self,
1702		participation: RoundParticipation,
1703		movement_kind: Option<RoundMovement>,
1704	) -> anyhow::Result<RoundStatus> {
1705		let mut state = self.join_next_round(participation, movement_kind).await?;
1706
1707		info!("Waiting for a round start...");
1708		let mut events = self.subscribe_round_events().await?;
1709
1710		loop {
1711			if !state.state().ongoing_participation() {
1712				let status = state.state_mut().sync(self).await?;
1713				match status {
1714					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1715					RoundStatus::Canceled => bail!("round canceled"),
1716					status => return Ok(status),
1717				}
1718			}
1719
1720			let event = events.next().await
1721				.context("events stream broke")?
1722				.context("error on event stream")?;
1723			if state.state_mut().process_event(self, &event).await {
1724				self.inner.db.update_round_state(&state).await?;
1725			}
1726		}
1727	}
1728}