Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
31use ark::mailbox::MailboxIdentifier;
32use ark::forfeit::HashLockedForfeitBundle;
33use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
34use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
35use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
36use bitcoin_ext::TxStatus;
37use server_rpc::{protos, ServerConnection, TryFromBytes};
38
39use crate::{SECP, Wallet, WalletVtxo};
40use crate::movement::{MovementId, MovementStatus};
41use crate::movement::update::MovementUpdate;
42use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
43use crate::subsystem::{RoundMovement, Subsystem};
44
45
46/// The type string for the hArk leaf transition
47const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
48
49/// Struct to communicate your specific participation for an Ark round.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RoundParticipation {
52	#[serde(with = "ark::encode::serde::vec")]
53	pub inputs: Vec<Vtxo<Full>>,
54	/// The output VTXOs that we request in the round,
55	/// including change
56	pub outputs: Vec<VtxoRequest>,
57	/// Optional mailbox identifier for round completion notification
58	#[serde(default, skip_serializing_if = "Option::is_none")]
59	pub unblinded_mailbox_id: Option<MailboxIdentifier>,
60}
61
62impl RoundParticipation {
63	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
64		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
65		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
66		let fee = input_amount - output_amount;
67		Ok(MovementUpdate::new()
68			.consumed_vtxos(&self.inputs)
69			.intended_balance(SignedAmount::ZERO)
70			.effective_balance( - fee.to_signed()?)
71			.fee(fee)
72		)
73	}
74}
75
76#[derive(Debug, Clone)]
77pub enum RoundStatus {
78	/// The round was successful and is fully confirmed
79	Confirmed {
80		funding_txid: Txid,
81	},
82	/// Round successful but not fully confirmed
83	Unconfirmed {
84		funding_txid: Txid,
85	},
86	/// Round didn't finish yet
87	Pending,
88	/// The round failed
89	Failed {
90		error: String,
91	},
92	/// User canceled the round
93	Canceled,
94}
95
96impl RoundStatus {
97	/// Whether this is the final state and it won't change anymore
98	pub fn is_final(&self) -> bool {
99		match self {
100			Self::Confirmed { .. } => true,
101			Self::Unconfirmed { .. } => false,
102			Self::Pending => false,
103			Self::Failed { .. } => true,
104			Self::Canceled => true,
105		}
106	}
107
108	/// Whether it looks like the round succeeded
109	pub fn is_success(&self) -> bool {
110		match self {
111			Self::Confirmed { .. } => true,
112			Self::Unconfirmed { .. } => true,
113			Self::Pending => false,
114			Self::Failed { .. } => false,
115			Self::Canceled => false,
116		}
117	}
118}
119
120/// State of the progress of a round participation
121///
122/// An instance of this struct is kept all the way from the intention of joining
123/// the next round, until either the round fully confirms or it fails and we are
124/// sure it won't have any effect on our wallet.
125///
126/// As soon as we have signed forfeit txs for the round, we keep track of this
127/// round attempt until we see another attempt we participated in confirm or
128/// we gain confidence that the failed attempt will never confirm.
129//
130//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
131// to have better control. this way we can touch db before we sent forfeit sigs
132pub struct RoundState {
133	/// Round is fully done
134	pub(crate) done: bool,
135
136	/// Our participation in this round
137	pub(crate) participation: RoundParticipation,
138
139	/// The flow of the round in case it is still ongoing with the server
140	pub(crate) flow: RoundFlowState,
141
142	/// The new output vtxos of this round participation
143	///
144	/// After we finish the interactive part, we fill this with the uncompleted
145	/// VTXOs which we then try to complete with the unlock preimage.
146	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
147
148	/// Whether we sent our forfeit signatures to the server
149	///
150	/// If we did this and the server refused to reveal our new VTXOs,
151	/// we will be forced to exit.
152	//TODO(stevenroose) implement exit when this is true and we can't make progress
153	// probably based on the input vtxos becoming close to expiry
154	pub(crate) sent_forfeit_sigs: bool,
155
156	/// The ID of the [Movement] associated with this round
157	pub(crate) movement_id: Option<MovementId>,
158}
159
160impl RoundState {
161	fn new_interactive(
162		participation: RoundParticipation,
163		movement_id: Option<MovementId>,
164	) -> Self {
165		Self {
166			participation,
167			movement_id,
168			flow: RoundFlowState::InteractivePending,
169			new_vtxos: Vec::new(),
170			sent_forfeit_sigs: false,
171			done: false,
172		}
173	}
174
175	fn new_non_interactive(
176		participation: RoundParticipation,
177		unlock_hash: UnlockHash,
178		movement_id: Option<MovementId>,
179	) -> Self {
180		Self {
181			participation,
182			movement_id,
183			flow: RoundFlowState::NonInteractivePending { unlock_hash },
184			new_vtxos: Vec::new(),
185			sent_forfeit_sigs: false,
186			done: false,
187		}
188	}
189
190	/// Our participation in this round
191	pub fn participation(&self) -> &RoundParticipation {
192		&self.participation
193	}
194
195	/// the unlock hash if already known
196	pub fn unlock_hash(&self) -> Option<UnlockHash> {
197		match self.flow {
198			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
199			RoundFlowState::InteractivePending => None,
200			RoundFlowState::InteractiveOngoing { .. } => None,
201			RoundFlowState::Failed { .. } => None,
202			RoundFlowState::Canceled => None,
203			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
204		}
205	}
206
207	pub fn funding_tx(&self) -> Option<&Transaction> {
208		match self.flow {
209			RoundFlowState::NonInteractivePending { .. } => None,
210			RoundFlowState::InteractivePending => None,
211			RoundFlowState::InteractiveOngoing { .. } => None,
212			RoundFlowState::Failed { .. } => None,
213			RoundFlowState::Canceled => None,
214			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
215		}
216	}
217
218	/// Whether the interactive part of the round is still ongoing
219	pub fn ongoing_participation(&self) -> bool {
220		match self.flow {
221			RoundFlowState::NonInteractivePending { .. } => false,
222			RoundFlowState::InteractivePending => true,
223			RoundFlowState::InteractiveOngoing { .. } => true,
224			RoundFlowState::Failed { .. } => false,
225			RoundFlowState::Canceled => false,
226			RoundFlowState::Finished { .. } => false,
227		}
228	}
229
230	/// Tries to cancel the round and returns whether it was succesfully canceled
231	/// or if it was already canceled or failed
232	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
233		let ret = match self.flow {
234			RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
235			RoundFlowState::Canceled => true,
236			RoundFlowState::Failed { .. } => true,
237			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
238				self.flow = RoundFlowState::Canceled;
239				true
240			},
241			RoundFlowState::Finished { .. } => false,
242		};
243		if ret {
244			persist_round_failure(wallet, &self.participation, self.movement_id).await
245				.context("failed to persist round failure for cancelation")?;
246		}
247		Ok(ret)
248	}
249
250	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
251		match start_attempt(wallet, &self.participation, attempt).await {
252			Ok(state) => {
253				self.flow = RoundFlowState::InteractiveOngoing {
254					round_seq: attempt.round_seq,
255					attempt_seq: attempt.attempt_seq,
256					state: state,
257				};
258			},
259			Err(e) => {
260				self.flow = RoundFlowState::Failed {
261					error: format!("{:#}", e),
262				};
263			},
264		}
265	}
266
267	/// Processes the given event and returns true if some update was made to the state
268	pub async fn process_event(
269		&mut self,
270		wallet: &Wallet,
271		event: &RoundEvent,
272	) -> bool {
273		let _: Infallible = match self.flow {
274			RoundFlowState::InteractivePending => {
275				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
276					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
277					self.try_start_attempt(wallet, e).await;
278					return true;
279				} else {
280					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
281						event.kind(), event.round_seq(), event.attempt_seq(),
282					);
283					return false;
284				}
285			},
286			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
287				// here we catch the cases where we're in a wrong flow
288
289				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
290					warn!("Round {} failed by server", round_seq);
291					self.flow = RoundFlowState::Failed {
292						error: format!("round {} failed by server", round_seq),
293					};
294					return true;
295				}
296
297				if event.round_seq() > round_seq {
298					// new round started, we don't support multiple parallel rounds,
299					// this means we failed
300					self.flow = RoundFlowState::Failed {
301						error: format!("round {} started while we were on {}",
302							event.round_seq(), round_seq,
303						),
304					};
305					return true;
306				}
307
308				if event.attempt_seq() < attempt_seq {
309					trace!("ignoring replayed message from old attempt");
310					return false;
311				}
312
313				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
314					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
315					self.try_start_attempt(wallet, e).await;
316					return true;
317				}
318				trace!("Processing event {} for round attempt {}:{} in state {}",
319					event.kind(), round_seq, attempt_seq, state.kind(),
320				);
321
322				return match progress_attempt(state, wallet, &self.participation, event).await {
323					AttemptProgressResult::NotUpdated => false,
324					AttemptProgressResult::Updated { new_state } => {
325						*state = new_state;
326						true
327					},
328					AttemptProgressResult::Failed(e) => {
329						warn!("Round failed with error: {:#}", e);
330						self.flow = RoundFlowState::Failed {
331							error: format!("{:#}", e),
332						};
333						true
334					},
335					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
336						self.new_vtxos = vtxos;
337						let funding_txid = funding_tx.compute_txid();
338						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
339						if let Some(mid) = self.movement_id {
340							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
341								warn!("Error updating the round funding txid: {:#}", e);
342							}
343						}
344						true
345					},
346				};
347			},
348			RoundFlowState::NonInteractivePending { .. }
349				| RoundFlowState::Finished { .. }
350				| RoundFlowState::Failed { .. }
351				| RoundFlowState::Canceled => return false,
352		};
353	}
354
355	/// Sync the round's status and return it
356	///
357	/// When success or failure is returned, the round state can be eliminated
358	//TODO(stevenroose) make RoundState manage its own db record
359	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
360		match self.flow {
361			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
362				Ok(RoundStatus::Confirmed {
363					funding_txid: funding_tx.compute_txid(),
364				})
365			},
366
367			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
368				Ok(RoundStatus::Pending)
369			},
370			RoundFlowState::Failed { ref error } => {
371				persist_round_failure(wallet, &self.participation, self.movement_id).await
372					.context("failed to persist round failure")?;
373				Ok(RoundStatus::Failed { error: error.clone() })
374			},
375			RoundFlowState::Canceled => {
376				persist_round_failure(wallet, &self.participation, self.movement_id).await
377					.context("failed to persist round failure")?;
378				Ok(RoundStatus::Canceled)
379			},
380
381			RoundFlowState::NonInteractivePending { unlock_hash } => {
382				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
383					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
384					Ok(HarkProgressResult::RoundNotFound) => {
385						self.handle_round_not_found(wallet).await
386					},
387					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
388						let funding_txid = funding_tx.compute_txid();
389						self.new_vtxos = new_vtxos;
390						self.flow = RoundFlowState::Finished {
391							funding_tx: funding_tx.clone(),
392							unlock_hash: unlock_hash,
393						};
394
395						persist_round_success(
396							wallet,
397							&self.participation,
398							self.movement_id,
399							&self.new_vtxos,
400							&funding_tx,
401						).await.context("failed to store successful round in DB!")?;
402
403						self.done = true;
404
405						Ok(RoundStatus::Confirmed { funding_txid })
406					},
407					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
408						if let Some(mid) = self.movement_id {
409							update_funding_txid(wallet, mid, funding_txid).await
410								.context("failed to update funding txid in DB")?;
411						}
412						Ok(RoundStatus::Unconfirmed { funding_txid })
413					},
414
415					//TODO(stevenroose) should we mark as failed for these cases?
416
417					Err(HarkForfeitError::Err(e)) => {
418						//TODO(stevenroose) we failed here but we might actualy be able to
419						// succeed if we retry. should we implement some kind of limited
420						// retry after which we mark as failed?
421						Err(e.context("error progressing non-interactive round"))
422					},
423					Err(HarkForfeitError::SentForfeits(e)) => {
424						self.sent_forfeit_sigs = true;
425						Err(e.context("error progressing non-interactive round \
426							after sending forfeit tx signatures"))
427					},
428				}
429			},
430			// interactive part finished, but didn't forfeit yet
431			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
432				let funding_txid = funding_tx.compute_txid();
433				let confirmed = check_funding_tx_confirmations(
434					wallet, funding_txid, &funding_tx,
435				).await.context("error checking funding tx confirmations")?;
436				if !confirmed {
437					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
438					return Ok(RoundStatus::Unconfirmed { funding_txid });
439				}
440
441				match hark_vtxo_swap(
442					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
443				).await {
444					Ok(()) => {
445						persist_round_success(
446							wallet,
447							&self.participation,
448							self.movement_id,
449							&self.new_vtxos,
450							&funding_tx,
451						).await.context("failed to store successful round in DB!")?;
452
453						self.done = true;
454
455						Ok(RoundStatus::Confirmed { funding_txid })
456					},
457					Err(HarkForfeitError::Err(e)) => {
458						Err(e.context("error forfeiting VTXOs after round"))
459					},
460					Err(HarkForfeitError::SentForfeits(e)) => {
461						self.sent_forfeit_sigs = true;
462						Err(e.context("error after having signed and sent \
463							forfeit signatures to server"))
464					},
465				}
466			},
467		}
468	}
469
470	/// Once we know the signed round funding tx, this returns the output VTXOs
471	/// for this round.
472	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
473		if self.new_vtxos.is_empty() {
474			None
475		} else {
476			Some(&self.new_vtxos)
477		}
478	}
479
480	/// Returns the input VTXOs that are locked in this round, but only
481	/// if no output VTXOs were issued yet.
482	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
483		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
484		match self.flow {
485			RoundFlowState::NonInteractivePending { .. }
486				| RoundFlowState::InteractivePending
487				| RoundFlowState::InteractiveOngoing { .. }
488			=> {
489				&self.participation.inputs
490			},
491			RoundFlowState::Finished { .. } => if self.done {
492				// inputs already unlocked
493				&[]
494			} else {
495				&self.participation.inputs
496			},
497			RoundFlowState::Failed { .. }
498				| RoundFlowState::Canceled
499			=> {
500				// inputs already unlocked
501				&[]
502			},
503		}
504	}
505
506	/// The balance pending in this round
507	///
508	/// This becomes zero once the new round VTXOs are unlocked.
509	pub fn pending_balance(&self) -> Amount {
510		if self.done {
511			return Amount::ZERO;
512		}
513
514		match self.flow {
515			RoundFlowState::NonInteractivePending { .. }
516				| RoundFlowState::InteractivePending
517				| RoundFlowState::InteractiveOngoing { .. }
518				| RoundFlowState::Finished { .. }
519			=> {
520				self.participation.outputs.iter().map(|o| o.amount).sum()
521			},
522			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
523				Amount::ZERO
524			},
525		}
526	}
527
528	/// Handle the case where the server reports our round participation as not found.
529	///
530	/// If we sent forfeit signatures (which only happens after the round was
531	/// confirmed), this is adversarial — trigger unilateral exit.
532	/// If we never sent forfeits, the server can't steal, so unlock the VTXOs
533	/// and verify with the server that they're still considered spendable.
534	async fn handle_round_not_found(
535		&mut self,
536		wallet: &Wallet,
537	) -> anyhow::Result<RoundStatus> {
538		info!("Server reports round participation not found (no forfeits sent)");
539		self.flow = RoundFlowState::Failed {
540			error: "server reports round participation not found".into(),
541		};
542		persist_round_failure(wallet, &self.participation, self.movement_id).await
543			.context("failed to persist round failure")?;
544
545		Ok(RoundStatus::Failed {
546			error: "server reports round participation not found".into(),
547		})
548	}
549}
550
551/// The state of the process flow of a round
552///
553/// This tracks the progress of the interactive part of the round, from
554/// waiting to start until finishing either succesfully or with a failure.
555pub enum RoundFlowState {
556	/// We don't do flow and we just wait for the round to finish
557	NonInteractivePending {
558		unlock_hash: UnlockHash,
559	},
560
561	/// Waiting for round to happen
562	InteractivePending,
563	/// Interactive part ongoing
564	InteractiveOngoing {
565		round_seq: RoundSeq,
566		attempt_seq: usize,
567		state: AttemptState,
568	},
569
570	/// Interactive part finished, waiting for confirmation
571	Finished {
572		funding_tx: Transaction,
573		unlock_hash: UnlockHash,
574	},
575
576	/// Failed during round
577	Failed {
578		error: String,
579	},
580
581	/// User canceled round
582	Canceled,
583}
584
585/// The state of a single round attempt
586///
587/// For each attempt that we participate in, we keep the state of our concrete
588/// participation.
589pub enum AttemptState {
590	AwaitingAttempt,
591	AwaitingUnsignedVtxoTree {
592		cosign_keys: Vec<Keypair>,
593		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
594		unlock_hash: UnlockHash,
595	},
596	AwaitingFinishedRound {
597		unsigned_round_tx: Transaction,
598		vtxos_spec: VtxoTreeSpec,
599		unlock_hash: UnlockHash,
600	},
601}
602
603impl AttemptState {
604	/// The state kind represented as a string
605	fn kind(&self) -> &'static str {
606		match self {
607			Self::AwaitingAttempt => "AwaitingAttempt",
608			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
609			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
610		}
611	}
612}
613
614/// Result from trying to progress an ongoing round attempt
615enum AttemptProgressResult {
616	Finished {
617		funding_tx: Transaction,
618		vtxos: Vec<Vtxo<Full>>,
619		unlock_hash: UnlockHash,
620	},
621	Failed(anyhow::Error),
622	/// When the state changes, this variant is returned
623	///
624	/// If during the processing, we have signed any forfeit txs and tried
625	/// sending them to the server, the [UnconfirmedRound] instance is returned
626	/// so that it can be stored in the state.
627	Updated {
628		new_state: AttemptState,
629	},
630	NotUpdated,
631}
632
633/// Participate in the new round attempt by submitting our round participation
634async fn start_attempt(
635	wallet: &Wallet,
636	participation: &RoundParticipation,
637	event: &RoundAttempt,
638) -> anyhow::Result<AttemptState> {
639	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
640
641	// Assign cosign pubkeys to the payment requests.
642	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
643		.take(participation.outputs.len())
644		.collect::<Vec<_>>();
645
646	// Prepare round participation info.
647	// For each of our requested vtxo output, we need a set of public and secret nonces.
648	let cosign_nonces = cosign_keys.iter()
649		.map(|key| {
650			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
651			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
652			for _ in 0..ark_info.nb_round_nonces {
653				let (s, p) = musig::nonce_pair(key);
654				secs.push(s);
655				pubs.push(p);
656			}
657			(secs, pubs)
658		})
659		.take(participation.outputs.len())
660		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
661
662
663	// The round has now started. We can submit our payment.
664	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
665		participation.inputs.len(), participation.outputs.len(),
666	);
667
668	// Build signed requests with mailbox IDs
669	let unblinded_mailbox_id = wallet.mailbox_identifier();
670	let signed_reqs = participation.outputs.iter()
671		.zip(cosign_keys.iter())
672		.zip(cosign_nonces.iter())
673		.map(|((req, cosign_key), (_sec, pub_nonces))| {
674			SignedVtxoRequest {
675				vtxo: req.clone(),
676				cosign_pubkey: cosign_key.public_key(),
677				nonces: pub_nonces.clone(),
678			}
679		})
680		.collect::<Vec<_>>();
681
682	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
683	for vtxo in participation.inputs.iter() {
684		let keypair = wallet.get_vtxo_key(vtxo).await
685			.map_err(HarkForfeitError::Err)?;
686		input_vtxos.push(protos::InputVtxo {
687			vtxo_id: vtxo.id().to_bytes().to_vec(),
688			attestation: {
689				let attestation = RoundAttemptAttestation::new(
690					event.challenge, vtxo.id(), &signed_reqs, &keypair,
691				);
692				attestation.serialize()
693			},
694		});
695	}
696
697	// Register VTXO transaction chains with server before round participation
698	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
699		.map_err(HarkForfeitError::Err)?;
700
701	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
702		input_vtxos: input_vtxos,
703		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
704		#[allow(deprecated)]
705		offboard_requests: vec![],
706		unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
707	}).await.context("Ark server refused our payment submission")?;
708
709	Ok(AttemptState::AwaitingUnsignedVtxoTree {
710		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
711		cosign_keys: cosign_keys,
712		secret_nonces: cosign_nonces.into_iter()
713			.map(|(sec, _pub)| sec.into_iter()
714				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
715				.collect())
716			.collect(),
717	})
718}
719
720/// just an internal type; need Error trait to work with anyhow
721#[derive(Debug, thiserror::Error)]
722enum HarkForfeitError {
723	/// An error happened after we sent forfeit signatures to the server
724	#[error("error after forfeits were sent")]
725	SentForfeits(#[source] anyhow::Error),
726	/// An error happened before we sent forfeit signatures to the server
727	#[error("error before forfeits were sent")]
728	Err(#[source] anyhow::Error),
729}
730
731async fn hark_cosign_leaf(
732	wallet: &Wallet,
733	srv: &mut ServerConnection,
734	funding_tx: &Transaction,
735	vtxo: &mut Vtxo<Full>,
736) -> anyhow::Result<()> {
737	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
738		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
739		.with_context(|| format!(
740			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
741		))?.1;
742	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
743	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
744		protos::LeafVtxoCosignRequest::from(cosign_req),
745	).await
746		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
747		.into_inner().try_into()
748		.context("bad leaf vtxo cosign response")?;
749	ensure!(ctx.finalize(vtxo, cosign_resp),
750		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
751	);
752
753	Ok(())
754}
755
756/// Finish the hArk VTXO swap protocol
757///
758/// This includes:
759/// - requesting cosignature of the locked hArk leaves
760/// - sending forfeit txs to the server in return for the unlock preimage
761///
762/// NB all the actions in this function are idempotent, meaning that the server
763/// allows them to be done multiple times. this means that if this function calls
764/// fails, it's safe to just call it again at a later time
765async fn hark_vtxo_swap(
766	wallet: &Wallet,
767	participation: &RoundParticipation,
768	output_vtxos: &mut [Vtxo<Full>],
769	funding_tx: &Transaction,
770	unlock_hash: UnlockHash,
771) -> Result<(), HarkForfeitError> {
772	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
773
774	// before we start make sure the server has our input vtxo signatures
775	wallet.register_vtxo_transactions_with_server(&participation.inputs).await
776		.context("couldn't send our input vtxo transactions to server")
777		.map_err(HarkForfeitError::Err)?;
778
779	// first get the leaves signed
780	for vtxo in output_vtxos.iter_mut() {
781		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
782			.map_err(HarkForfeitError::Err)?;
783	}
784
785	// then do the forfeit dance
786
787	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
788		unlock_hash: unlock_hash.to_byte_array().to_vec(),
789		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
790	}).await
791		.context("request forfeits nonces call failed")
792		.map_err(HarkForfeitError::Err)?
793		.into_inner().public_nonces.into_iter()
794		.map(|b| musig::PublicNonce::from_bytes(b))
795		.collect::<Result<Vec<_>, _>>()
796		.context("invalid forfeit nonces")
797		.map_err(HarkForfeitError::Err)?;
798
799	if server_nonces.len() != participation.inputs.len() {
800		return Err(HarkForfeitError::Err(anyhow!(
801			"server sent {} nonce pairs, expected {}",
802			server_nonces.len(), participation.inputs.len(),
803		)));
804	}
805
806	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
807	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
808		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
809			.ok().flatten().with_context(|| format!(
810				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
811			)).map_err(HarkForfeitError::Err)?.1;
812		forfeit_bundles.push(HashLockedForfeitBundle::new(
813			input, unlock_hash, &user_key, &nonces,
814		))
815	}
816
817	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
818		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
819	}).await
820		.context("forfeit vtxos call failed")
821		.map_err(HarkForfeitError::SentForfeits)?
822		.into_inner().unlock_preimage.as_slice().try_into()
823		.context("invalid preimage length")
824		.map_err(HarkForfeitError::SentForfeits)?;
825
826	for vtxo in output_vtxos.iter_mut() {
827		if !vtxo.provide_unlock_preimage(preimage) {
828			return Err(HarkForfeitError::SentForfeits(anyhow!(
829				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
830				preimage.as_hex(), vtxo.id(), unlock_hash,
831			)));
832		}
833
834		// then validate the vtxo works
835		vtxo.validate(&funding_tx).with_context(|| format!(
836			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
837		)).map_err(HarkForfeitError::SentForfeits)?;
838	}
839
840	Ok(())
841}
842
843fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
844	match vtxo.validate(funding_tx) {
845		Err(VtxoValidationError::GenesisTransition {
846			genesis_idx, genesis_len, transition_kind, ..
847		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
848		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
849			vtxo.serialize_hex(),
850		)),
851		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
852	}
853}
854
855fn check_round_matches_participation(
856	part: &RoundParticipation,
857	new_vtxos: &[Vtxo<Full>],
858	funding_tx: &Transaction,
859) -> anyhow::Result<()> {
860	ensure!(new_vtxos.len() == part.outputs.len(),
861		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
862	);
863
864	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
865		ensure!(vtxo.amount() == req.amount,
866			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
867		);
868		ensure!(*vtxo.policy() == req.policy,
869			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
870		);
871
872		// We accept the VTXO if only the hArk transition (last) failure happens
873		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
874	}
875
876	Ok(())
877}
878
879/// Check the confirmation status of a funding tx
880///
881/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
882/// The required number of confirmations depends on the wallet's configuration.
883///
884/// Returns false if the funding tx seems valid but not confirmed yet.
885///
886/// Returns an error if the chain source fails or if we can't submit the tx to the
887/// mempool, suggesting it might be double spent.
888async fn check_funding_tx_confirmations(
889	wallet: &Wallet,
890	funding_txid: Txid,
891	funding_tx: &Transaction,
892) -> anyhow::Result<bool> {
893	let tip = wallet.chain.tip().await.context("chain source error")?;
894	let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
895	let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
896	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
897		funding_txid, tx_status, tip,
898	);
899	match tx_status {
900		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
901		TxStatus::Mempool | TxStatus::Confirmed(_) => {
902			if wallet.config.round_tx_required_confirmations == 0 {
903				debug!("Accepting round funding tx without confirmations because of configuration");
904				Ok(true)
905			} else {
906				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
907				Ok(false)
908			}
909		},
910		TxStatus::NotFound => {
911			// let's try to submit it to our mempool
912			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
913			// reliably distinguish the cases of our chain source having issues and the tx
914			// actually being rejected which suggests the round was double-spent
915			if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
916				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
917					funding_txid, serialize_hex(funding_tx), e,
918				))
919			} else {
920				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
921				Ok(false)
922			}
923		},
924	}
925}
926
927enum HarkProgressResult {
928	RoundPending,
929	RoundNotFound,
930	FundingTxUnconfirmed {
931		funding_txid: Txid,
932	},
933	Ok {
934		funding_tx: Transaction,
935		new_vtxos: Vec<Vtxo<Full>>,
936	},
937}
938
939async fn progress_non_interactive(
940	wallet: &Wallet,
941	participation: &RoundParticipation,
942	unlock_hash: UnlockHash,
943) -> Result<HarkProgressResult, HarkForfeitError> {
944	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
945
946	let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
947		unlock_hash: unlock_hash.to_byte_array().to_vec(),
948	}).await {
949		Ok(resp) => resp.into_inner(),
950		Err(err) if err.code() == tonic::Code::NotFound => {
951			return Ok(HarkProgressResult::RoundNotFound);
952		},
953		Err(err) => {
954			return Err(HarkForfeitError::Err(
955				anyhow::Error::from(err).context("error checking round participation status"),
956			));
957		},
958	};
959	let status = protos::RoundParticipationStatus::try_from(resp.status)
960		.context("unknown status from server")
961		.map_err(HarkForfeitError::Err)	?;
962
963	if status == protos::RoundParticipationStatus::RoundPartPending {
964		trace!("Hark round still pending");
965		return Ok(HarkProgressResult::RoundPending);
966	}
967
968	// Since we got here, we clearly don't think we're finished.
969	// So even if the server thinks we did the dance before, we need the
970	// cosignature on the leaf tx so we need to do the dance again.
971	// "Guilty feet have got no rhythm."
972	if status == protos::RoundParticipationStatus::RoundPartReleased {
973		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
974		warn!("Server says preimage was already released for hArk participation \
975			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
976		);
977	}
978
979	let funding_tx_bytes = resp.round_funding_tx
980		.context("funding txid should be provided when status is not pending")
981		.map_err(HarkForfeitError::Err)?;
982	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
983		.context("invalid funding txid")
984		.map_err(HarkForfeitError::Err)?;
985	let funding_txid = funding_tx.compute_txid();
986	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
987		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
988	);
989
990	// Check the confirmation status of the funding tx
991	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
992		Ok(true) => {},
993		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
994		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
995	}
996
997	let mut new_vtxos = resp.output_vtxos.into_iter()
998		.map(|v| <Vtxo<Full>>::deserialize(&v))
999		.collect::<Result<Vec<_>, _>>()
1000		.context("invalid output VTXOs from server")
1001		.map_err(HarkForfeitError::Err)?;
1002
1003	// Check that the vtxos match our participation in the exact order
1004	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1005		.context("new VTXOs received from server don't match our participation")
1006		.map_err(HarkForfeitError::Err)?;
1007
1008	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1009		.context("error forfeiting hArk VTXOs")
1010		.map_err(HarkForfeitError::SentForfeits)?;
1011
1012	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1013}
1014
1015async fn progress_attempt(
1016	state: &AttemptState,
1017	wallet: &Wallet,
1018	part: &RoundParticipation,
1019	event: &RoundEvent,
1020) -> AttemptProgressResult {
1021	// we will match only the states and messages required to make progress,
1022	// all else we ignore, except an unexpected finish
1023
1024	match (state, event) {
1025
1026		(
1027			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1028			RoundEvent::VtxoProposal(e),
1029		) => {
1030			trace!("Received VtxoProposal: {:#?}", e);
1031			match sign_vtxo_tree(
1032				wallet,
1033				part,
1034				&cosign_keys,
1035				&secret_nonces,
1036				&e.unsigned_round_tx,
1037				&e.vtxos_spec,
1038				&e.cosign_agg_nonces,
1039			).await {
1040				Ok(()) => {
1041					AttemptProgressResult::Updated {
1042						new_state: AttemptState::AwaitingFinishedRound {
1043							unsigned_round_tx: e.unsigned_round_tx.clone(),
1044							vtxos_spec: e.vtxos_spec.clone(),
1045							unlock_hash: *unlock_hash,
1046						},
1047					}
1048				},
1049				Err(e) => {
1050					trace!("Error signing VTXO tree: {:#}", e);
1051					AttemptProgressResult::Failed(e)
1052				},
1053			}
1054		},
1055
1056		(
1057			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1058			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1059		) => {
1060			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1061				return AttemptProgressResult::Failed(anyhow!(
1062					"signed funding tx ({}) doesn't match tx received before ({})",
1063					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1064				));
1065			}
1066
1067			if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1068				warn!("Failed to broadcast signed round tx: {:#}", e);
1069			}
1070
1071			match construct_new_vtxos(
1072				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1073			).await {
1074				Ok(v) => AttemptProgressResult::Finished {
1075					funding_tx: signed_round_tx.clone(),
1076					vtxos: v,
1077					unlock_hash: *unlock_hash,
1078				},
1079				Err(e) => AttemptProgressResult::Failed(anyhow!(
1080					"failed to construct new VTXOs for round: {:#}", e,
1081				)),
1082			}
1083		},
1084
1085		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1086			AttemptProgressResult::Failed(anyhow!(
1087				"unexpectedly received a finished round while we were in state {}",
1088				state.kind(),
1089			))
1090		},
1091
1092		(state, _) => {
1093			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1094			AttemptProgressResult::NotUpdated
1095		},
1096	}
1097}
1098
1099async fn sign_vtxo_tree(
1100	wallet: &Wallet,
1101	participation: &RoundParticipation,
1102	cosign_keys: &[Keypair],
1103	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1104	unsigned_round_tx: &Transaction,
1105	vtxo_tree: &VtxoTreeSpec,
1106	cosign_agg_nonces: &[musig::AggregatedNonce],
1107) -> anyhow::Result<()> {
1108	let (srv, _) = wallet.require_server().await.context("server not available")?;
1109
1110	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1111
1112	// Check that the proposal contains our inputs.
1113	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1114	for vtxo_req in vtxo_tree.iter_vtxos() {
1115		if let Some(i) = my_vtxos.iter().position(|v| {
1116			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1117		}) {
1118			my_vtxos.swap_remove(i);
1119		}
1120	}
1121	if !my_vtxos.is_empty() {
1122		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1123	}
1124
1125	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1127	trace!("Sending vtxo signatures to server...");
1128	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1129		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1130		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1131		let part_sigs = unsigned_vtxos.cosign_branch(
1132			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1133		).context("failed to cosign branch: our request not part of tree")?;
1134
1135		info!("Sending {} partial vtxo cosign signatures for pk {}",
1136			part_sigs.len(), key.public_key(),
1137		);
1138
1139		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1140			pubkey: key.public_key().serialize().to_vec(),
1141			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1142		}).await.context("error sending vtxo signatures")?;
1143
1144		Result::<(), anyhow::Error>::Ok(())
1145	})).await.context("error sending VTXO signatures")?;
1146	trace!("Done sending vtxo signatures to server");
1147
1148	Ok(())
1149}
1150
1151async fn construct_new_vtxos(
1152	participation: &RoundParticipation,
1153	unsigned_round_tx: &Transaction,
1154	vtxo_tree: &VtxoTreeSpec,
1155	vtxo_cosign_sigs: &[schnorr::Signature],
1156) -> anyhow::Result<Vec<Vtxo<Full>>> {
1157	let round_txid = unsigned_round_tx.compute_txid();
1158	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1159	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1160
1161	// Validate the vtxo tree and cosign signatures.
1162	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1163		// bad server!
1164		bail!("Received incorrect vtxo cosign signatures from server");
1165	}
1166
1167	let signed_vtxos = vtxo_tree
1168		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1169		.into_cached_tree();
1170
1171	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1172	let total_nb_expected_vtxos = expected_vtxos.len();
1173
1174	let mut new_vtxos = vec![];
1175	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1176		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1177			let vtxo = signed_vtxos.build_vtxo(idx);
1178
1179			// validate the received vtxos
1180			// This is more like a sanity check since we crafted them ourselves.
1181			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1182				.context("constructed invalid vtxo from tree")?;
1183
1184			info!("New VTXO from round: {} ({}, {})",
1185				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1186			);
1187
1188			new_vtxos.push(vtxo);
1189			expected_vtxos.swap_remove(expected_idx);
1190		}
1191	}
1192
1193	if !expected_vtxos.is_empty() {
1194		if expected_vtxos.len() == total_nb_expected_vtxos {
1195			// we must have done something wrong
1196			bail!("None of our VTXOs were present in round!");
1197		} else {
1198			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1199				expected_vtxos.len(), expected_vtxos,
1200			);
1201		}
1202	}
1203	Ok(new_vtxos)
1204}
1205
1206//TODO(stevenroose) should be made idempotent
1207async fn persist_round_success(
1208	wallet: &Wallet,
1209	participation: &RoundParticipation,
1210	movement_id: Option<MovementId>,
1211	new_vtxos: &[Vtxo<Full>],
1212	funding_tx: &Transaction,
1213) -> anyhow::Result<()> {
1214	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1215		new_vtxos.len(), movement_id,
1216	);
1217
1218	// we first try all actions that need to happen and only afterwards return errors
1219	// so that we achieve maximum success
1220
1221	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1222		.context("failed to store new VTXOs");
1223	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1224		.context("failed to mark input VTXOs as spent");
1225	let update_result = if let Some(mid) = movement_id {
1226		wallet.movements.finish_movement_with_update(
1227			mid,
1228			MovementStatus::Successful,
1229			MovementUpdate::new()
1230				.produced_vtxos(new_vtxos)
1231				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1232		).await.context("failed to mark movement as finished")
1233	} else {
1234		Ok(())
1235	};
1236
1237	store_result?;
1238	spent_result?;
1239	update_result?;
1240
1241	Ok(())
1242}
1243
1244async fn persist_round_failure(
1245	wallet: &Wallet,
1246	participation: &RoundParticipation,
1247	movement_id: Option<MovementId>,
1248) -> anyhow::Result<()> {
1249	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1250	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1251	let finish_result = if let Some(movement_id) = movement_id {
1252		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1253	} else {
1254		Ok(())
1255	};
1256	if let Err(e) = &finish_result {
1257		error!("Failed to mark movement as failed: {:#}", e);
1258	}
1259	match (unlock_result, finish_result) {
1260		(Ok(()), Ok(())) => Ok(()),
1261		(Err(e), _) => Err(e),
1262		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1263	}
1264}
1265
1266async fn update_funding_txid(
1267	wallet: &Wallet,
1268	movement_id: MovementId,
1269	funding_txid: Txid,
1270) -> anyhow::Result<()> {
1271	wallet.movements.update_movement(
1272		movement_id,
1273		MovementUpdate::new()
1274			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1275	).await.context("Unable to update funding txid of round")
1276}
1277
1278impl Wallet {
1279	/// Ask the server when the next round is scheduled to start
1280	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1281		let (mut srv, _) = self.require_server().await?;
1282		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1283		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1284	}
1285
1286	/// Start a new round participation
1287	///
1288	/// This function will store the state in the db and mark the VTXOs as locked.
1289	///
1290	/// ### Return
1291	///
1292	/// - By default, the returned state will be locked to prevent race conditions.
1293	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1294	pub async fn join_next_round(
1295		&self,
1296		participation: RoundParticipation,
1297		movement_kind: Option<RoundMovement>,
1298	) -> anyhow::Result<StoredRoundState> {
1299		let movement_id = if let Some(kind) = movement_kind {
1300			Some(self.movements.new_movement_with_update(
1301				Subsystem::ROUND,
1302				kind.to_string(),
1303				participation.to_movement_update()?
1304			).await?)
1305		} else {
1306			None
1307		};
1308		let state = RoundState::new_interactive(participation, movement_id);
1309
1310		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1311		let state = self.lock_wait_round_state(id).await?
1312			.context("failed to lock fresh round state")?;
1313
1314		Ok(state)
1315	}
1316
1317	/// Join next round in non-interactive or delegated mode
1318	pub async fn join_next_round_delegated(
1319		&self,
1320		participation: RoundParticipation,
1321		movement_kind: Option<RoundMovement>,
1322	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1323		let (mut srv, _) = self.require_server().await?;
1324
1325		let movement_id = if let Some(kind) = movement_kind {
1326			Some(self.movements.new_movement_with_update(
1327				Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1328			).await?)
1329		} else {
1330			None
1331		};
1332
1333		// Get mailbox identifier for VTXO delivery
1334		let unblinded_mailbox_id = self.mailbox_identifier();
1335
1336		// Generate attestations for input vtxos
1337		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1338		for vtxo in participation.inputs.iter() {
1339			let keypair = self.get_vtxo_key(vtxo).await
1340				.context("failed to get vtxo keypair")?;
1341			input_vtxos.push(protos::InputVtxo {
1342				vtxo_id: vtxo.id().to_bytes().to_vec(),
1343				attestation: {
1344					let attestation = DelegatedRoundParticipationAttestation::new(
1345						vtxo.id(), &participation.outputs, &keypair,
1346					);
1347					attestation.serialize()
1348				},
1349			});
1350		}
1351
1352		// Build proto VtxoRequests
1353		let vtxo_requests = participation.outputs.iter()
1354			.map(|req|
1355				protos::VtxoRequest {
1356					policy: req.policy.serialize(),
1357					amount: req.amount.to_sat(),
1358			})
1359			.collect::<Vec<_>>();
1360
1361		// Submit participation to server and get unlock_hash
1362		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1363			input_vtxos,
1364			vtxo_requests,
1365			unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
1366		}).await.context("error submitting round participation to server")?.into_inner();
1367
1368		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1369			.context("invalid unlock hash from server")?;
1370
1371		let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1372
1373		info!("Non-interactive round participation submitted, it will automatically execute \
1374			when you next sync your wallet after the round happened \
1375			(and has sufficient confirmations).",
1376		);
1377
1378		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1379		Ok(StoredRoundState::new(id, state))
1380	}
1381
1382	/// Get all pending round states
1383	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1384		self.db.get_pending_round_state_ids().await
1385	}
1386
1387	/// Get all pending round states
1388	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1389		let ids = self.db.get_pending_round_state_ids().await?;
1390		let mut states = Vec::with_capacity(ids.len());
1391		for id in ids {
1392			if let Some(state) = self.db.get_round_state_by_id(id).await? {
1393				states.push(state);
1394			}
1395		}
1396		Ok(states)
1397	}
1398
1399	/// Balance locked in pending rounds
1400	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1401		let mut ret = Amount::ZERO;
1402		for round in self.pending_round_states().await? {
1403			ret += round.state().pending_balance();
1404		}
1405		Ok(ret)
1406	}
1407
1408	/// Returns all VTXOs that are locked in a pending round
1409	///
1410	/// This excludes all input VTXOs for which the output VTXOs have already
1411	/// been created.
1412	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1413		let mut ret = Vec::new();
1414		for round in self.pending_round_states().await? {
1415			let inputs = round.state().locked_pending_inputs();
1416			ret.reserve(inputs.len());
1417			for input in inputs {
1418				let v = self.get_vtxo_by_id(input.id()).await
1419					.context("unknown round input VTXO")?;
1420				ret.push(v);
1421			}
1422		}
1423		Ok(ret)
1424	}
1425
1426	/// Sync pending rounds that have finished but are waiting for confirmations
1427	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1428		let states = self.pending_round_states().await?;
1429		if states.is_empty() {
1430			return Ok(());
1431		}
1432
1433		debug!("Syncing {} pending round states...", states.len());
1434
1435		tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1436			// not processing events here
1437			if state.state().ongoing_participation() {
1438				return;
1439			}
1440
1441			let mut state = match self.lock_wait_round_state(state.id()).await {
1442				Ok(Some(state)) => state,
1443				Ok(None) => return,
1444				Err(e) => {
1445					warn!("Error locking round state: {:#}", e);
1446					return;
1447				},
1448			};
1449
1450			let status = state.state_mut().sync(self).await;
1451			trace!("Synced round #{}, status: {:?}", state.id(), status);
1452			match status {
1453				Ok(RoundStatus::Confirmed { funding_txid }) => {
1454					info!("Round confirmed. Funding tx {}", funding_txid);
1455					if let Err(e) = self.db.remove_round_state(&state).await {
1456						warn!("Error removing confirmed round state from db: {:#}", e);
1457					}
1458				},
1459				Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1460					info!("Waiting for confirmations for round funding tx {}", funding_txid);
1461					if let Err(e) = self.db.update_round_state(&state).await {
1462						warn!("Error updating pending round state in db: {:#}", e);
1463					}
1464				},
1465				Ok(RoundStatus::Pending) => {
1466					if let Err(e) = self.db.update_round_state(&state).await {
1467						warn!("Error updating pending round state in db: {:#}", e);
1468					}
1469				},
1470				Ok(RoundStatus::Failed { error }) => {
1471					error!("Round failed: {}", error);
1472					if let Err(e) = self.db.remove_round_state(&state).await {
1473						warn!("Error removing failed round state from db: {:#}", e);
1474					}
1475				},
1476				Ok(RoundStatus::Canceled) => {
1477					error!("Round canceled");
1478					if let Err(e) = self.db.remove_round_state(&state).await {
1479						warn!("Error removing canceled round state from db: {:#}", e);
1480					}
1481				},
1482				Err(e) => warn!("Error syncing round: {:#}", e),
1483			}
1484		}).await;
1485
1486		Ok(())
1487	}
1488
1489	/// Fetch last round event from server
1490	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1491		let (mut srv, _) = self.require_server().await?;
1492		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1493		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1494	}
1495
1496	async fn inner_process_event(
1497		&self,
1498		state: &mut StoredRoundState,
1499		event: Option<&RoundEvent>,
1500	) {
1501		if let Some(event) = event && state.state().ongoing_participation() {
1502			let updated = state.state_mut().process_event(self, &event).await;
1503			if updated {
1504				if let Err(e) = self.db.update_round_state(&state).await {
1505					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1506				}
1507			}
1508		}
1509
1510		match state.state_mut().sync(self).await {
1511			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1512			Ok(s) if s.is_final() => {
1513				info!("Round #{} finished with result: {:?}", state.id(), s);
1514				if let Err(e) = self.db.remove_round_state(&state).await {
1515					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1516				}
1517			},
1518			Ok(s) => {
1519				trace!("Round state #{} is now in state {:?}", state.id(), s);
1520				if let Err(e) = self.db.update_round_state(&state).await {
1521					warn!("Error storing round state #{}: {:#}", state.id(), e);
1522				}
1523			},
1524		}
1525	}
1526
1527	/// Try to make incremental progress on all pending round states
1528	///
1529	/// If the `last_round_event` argument is not provided, it will be fetched
1530	/// from the server.
1531	pub async fn progress_pending_rounds(
1532		&self,
1533		last_round_event: Option<&RoundEvent>,
1534	) -> anyhow::Result<()> {
1535		let states = self.pending_round_states().await?;
1536		if states.is_empty() {
1537			return Ok(());
1538		}
1539
1540		info!("Processing {} rounds...", states.len());
1541
1542		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1543
1544		let has_ongoing_participation = states.iter()
1545			.any(|s| s.state().ongoing_participation());
1546		if has_ongoing_participation && last_round_event.is_none() {
1547			match self.get_last_round_event().await {
1548				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1549				Err(e) => {
1550					warn!("Error fetching round event, \
1551						failed to progress ongoing rounds: {:#}", e);
1552				},
1553			}
1554		}
1555
1556		let event = last_round_event.as_ref().map(|c| c.as_ref());
1557
1558		let futs = states.into_iter().map(async |state| {
1559			let locked = self.lock_wait_round_state(state.id()).await?;
1560			if let Some(mut locked) = locked {
1561				self.inner_process_event(&mut locked, event).await;
1562			}
1563			Ok::<_, anyhow::Error>(())
1564		});
1565
1566		futures::future::join_all(futs).await;
1567
1568		Ok(())
1569	}
1570
1571	pub async fn subscribe_round_events(&self)
1572		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1573	{
1574		let (mut srv, _) = self.require_server().await?;
1575		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1576			.into_inner().map(|m| {
1577				let m = m.context("received error on event stream")?;
1578				let e = RoundEvent::try_from(m.clone())
1579					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1580				trace!("Received round event: {}", e);
1581				Ok::<_, anyhow::Error>(e)
1582			});
1583		Ok(events)
1584	}
1585
1586	/// A blocking call that will try to perform a full round participation
1587	/// for all ongoing rounds
1588	///
1589	/// Returns only once there is no ongoing rounds anymore.
1590	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1591		let mut events = self.subscribe_round_events().await?;
1592
1593		loop {
1594			// NB: we need to load all ongoing rounds on every iteration here
1595			// because some might be finished by another call
1596			let state_ids = self.pending_round_states().await?.iter()
1597				.filter(|s| s.state().ongoing_participation())
1598				.map(|s| s.id())
1599				.collect::<Vec<_>>();
1600
1601			if state_ids.is_empty() {
1602				info!("All rounds handled");
1603				return Ok(());
1604			}
1605
1606			let event = events.next().await
1607				.context("events stream broke")?
1608				.context("error on event stream")?;
1609
1610			let futs = state_ids.into_iter().map(async |state| {
1611				let locked = self.lock_wait_round_state(state).await?;
1612				if let Some(mut locked) = locked {
1613					self.inner_process_event(&mut locked, Some(&event)).await;
1614				}
1615				Ok::<_, anyhow::Error>(())
1616			});
1617
1618			futures::future::join_all(futs).await;
1619		}
1620	}
1621
1622	/// Will cancel all pending rounds that can safely be canceled
1623	///
1624	/// All rounds that have not started yet can safely be canceled,
1625	/// as well as rounds where we have not yet signed any forfeit txs.
1626	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1627		// initial load to get all pending round states ids
1628		let state_ids = self.db.get_pending_round_state_ids().await?;
1629
1630		let futures = state_ids.into_iter().map(|state_id| {
1631			async move {
1632				// wait for lock and load again to ensure most recent state
1633				let mut state = match self.lock_wait_round_state(state_id).await {
1634					Ok(Some(s)) => s,
1635					Ok(None) => return,
1636					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1637				};
1638
1639				match state.state_mut().try_cancel(self).await {
1640					Ok(true) => {
1641						if let Err(e) = self.db.remove_round_state(&state).await {
1642							warn!("Error removing canceled round state from db: {:#}", e);
1643						}
1644					},
1645					Ok(false) => {},
1646					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1647				}
1648			}
1649		});
1650
1651		join_all(futures).await;
1652
1653		Ok(())
1654	}
1655
1656	/// Try to cancel the given round
1657	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1658		let mut state = self.lock_wait_round_state(id).await?
1659			.context("round state not found")?;
1660
1661		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1662			self.db.remove_round_state(&state).await
1663				.context("error removing canceled round state from db")?;
1664		} else {
1665			bail!("failed to cancel round");
1666		}
1667
1668		Ok(())
1669	}
1670
1671	/// Participate in a round
1672	///
1673	/// This function will start a new round participation and block until
1674	/// the round is finished.
1675	/// After this method returns the round state will be kept active until
1676	/// the round tx fully confirms.
1677	pub(crate) async fn participate_round(
1678		&self,
1679		participation: RoundParticipation,
1680		movement_kind: Option<RoundMovement>,
1681	) -> anyhow::Result<RoundStatus> {
1682		let mut state = self.join_next_round(participation, movement_kind).await?;
1683
1684		info!("Waiting for a round start...");
1685		let mut events = self.subscribe_round_events().await?;
1686
1687		loop {
1688			if !state.state().ongoing_participation() {
1689				let status = state.state_mut().sync(self).await?;
1690				match status {
1691					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1692					RoundStatus::Canceled => bail!("round canceled"),
1693					status => return Ok(status),
1694				}
1695			}
1696
1697			let event = events.next().await
1698				.context("events stream broke")?
1699				.context("error on event stream")?;
1700			if state.state_mut().process_event(self, &event).await {
1701				self.db.update_round_state(&state).await?;
1702			}
1703		}
1704	}
1705}