Skip to main content

bark/round/
mod.rs

1//!
2//! Round State Machine
3//!
4
5mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
31use ark::mailbox::MailboxIdentifier;
32use ark::forfeit::HashLockedForfeitBundle;
33use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
34use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
35use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
36use bitcoin_ext::TxStatus;
37use server_rpc::{protos, ServerConnection, TryFromBytes};
38
39use crate::{SECP, Wallet, WalletVtxo};
40use crate::movement::{MovementId, MovementStatus};
41use crate::movement::update::MovementUpdate;
42use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
43use crate::subsystem::{RoundMovement, Subsystem};
44
45
46/// The type string for the hArk leaf transition
47const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
48
49/// Struct to communicate your specific participation for an Ark round.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RoundParticipation {
52	#[serde(with = "ark::encode::serde::vec")]
53	pub inputs: Vec<Vtxo<Full>>,
54	/// The output VTXOs that we request in the round,
55	/// including change
56	pub outputs: Vec<VtxoRequest>,
57	/// Optional mailbox identifier for round completion notification
58	#[serde(default, skip_serializing_if = "Option::is_none")]
59	pub unblinded_mailbox_id: Option<MailboxIdentifier>,
60}
61
62impl RoundParticipation {
63	pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
64		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
65		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
66		let fee = input_amount - output_amount;
67		Ok(MovementUpdate::new()
68			.consumed_vtxos(&self.inputs)
69			.intended_balance(SignedAmount::ZERO)
70			.effective_balance( - fee.to_signed()?)
71			.fee(fee)
72		)
73	}
74}
75
76#[derive(Debug, Clone)]
77pub enum RoundStatus {
78	/// The round was successful and is fully confirmed
79	Confirmed {
80		funding_txid: Txid,
81	},
82	/// Round successful but not fully confirmed
83	Unconfirmed {
84		funding_txid: Txid,
85	},
86	/// Round didn't finish yet
87	Pending,
88	/// The round failed
89	Failed {
90		error: String,
91	},
92	/// User canceled the round
93	Canceled,
94}
95
96impl RoundStatus {
97	/// Whether this is the final state and it won't change anymore
98	pub fn is_final(&self) -> bool {
99		match self {
100			Self::Confirmed { .. } => true,
101			Self::Unconfirmed { .. } => false,
102			Self::Pending => false,
103			Self::Failed { .. } => true,
104			Self::Canceled => true,
105		}
106	}
107
108	/// Whether it looks like the round succeeded
109	pub fn is_success(&self) -> bool {
110		match self {
111			Self::Confirmed { .. } => true,
112			Self::Unconfirmed { .. } => true,
113			Self::Pending => false,
114			Self::Failed { .. } => false,
115			Self::Canceled => false,
116		}
117	}
118}
119
120/// State of the progress of a round participation
121///
122/// An instance of this struct is kept all the way from the intention of joining
123/// the next round, until either the round fully confirms or it fails and we are
124/// sure it won't have any effect on our wallet.
125///
126/// As soon as we have signed forfeit txs for the round, we keep track of this
127/// round attempt until we see another attempt we participated in confirm or
128/// we gain confidence that the failed attempt will never confirm.
129//
130//TODO(stevenroose) move the id in here and have the state persist itself with the wallet
131// to have better control. this way we can touch db before we sent forfeit sigs
132pub struct RoundState {
133	/// Round is fully done
134	pub(crate) done: bool,
135
136	/// Our participation in this round
137	pub(crate) participation: RoundParticipation,
138
139	/// The flow of the round in case it is still ongoing with the server
140	pub(crate) flow: RoundFlowState,
141
142	/// The new output vtxos of this round participation
143	///
144	/// After we finish the interactive part, we fill this with the uncompleted
145	/// VTXOs which we then try to complete with the unlock preimage.
146	pub(crate) new_vtxos: Vec<Vtxo<Full>>,
147
148	/// Whether we sent our forfeit signatures to the server
149	///
150	/// If we did this and the server refused to reveal our new VTXOs,
151	/// we will be forced to exit.
152	//TODO(stevenroose) implement exit when this is true and we can't make progress
153	// probably based on the input vtxos becoming close to expiry
154	pub(crate) sent_forfeit_sigs: bool,
155
156	/// The ID of the [Movement] associated with this round
157	pub(crate) movement_id: Option<MovementId>,
158}
159
160impl RoundState {
161	fn new_interactive(
162		participation: RoundParticipation,
163		movement_id: Option<MovementId>,
164	) -> Self {
165		Self {
166			participation,
167			movement_id,
168			flow: RoundFlowState::InteractivePending,
169			new_vtxos: Vec::new(),
170			sent_forfeit_sigs: false,
171			done: false,
172		}
173	}
174
175	fn new_non_interactive(
176		participation: RoundParticipation,
177		unlock_hash: UnlockHash,
178		movement_id: Option<MovementId>,
179	) -> Self {
180		Self {
181			participation,
182			movement_id,
183			flow: RoundFlowState::NonInteractivePending { unlock_hash },
184			new_vtxos: Vec::new(),
185			sent_forfeit_sigs: false,
186			done: false,
187		}
188	}
189
190	/// Our participation in this round
191	pub fn participation(&self) -> &RoundParticipation {
192		&self.participation
193	}
194
195	/// the unlock hash if already known
196	pub fn unlock_hash(&self) -> Option<UnlockHash> {
197		match self.flow {
198			RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
199			RoundFlowState::InteractivePending => None,
200			RoundFlowState::InteractiveOngoing { .. } => None,
201			RoundFlowState::Failed { .. } => None,
202			RoundFlowState::Canceled => None,
203			RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
204		}
205	}
206
207	pub fn funding_tx(&self) -> Option<&Transaction> {
208		match self.flow {
209			RoundFlowState::NonInteractivePending { .. } => None,
210			RoundFlowState::InteractivePending => None,
211			RoundFlowState::InteractiveOngoing { .. } => None,
212			RoundFlowState::Failed { .. } => None,
213			RoundFlowState::Canceled => None,
214			RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
215		}
216	}
217
218	/// Whether the interactive part of the round is still ongoing
219	pub fn ongoing_participation(&self) -> bool {
220		match self.flow {
221			RoundFlowState::NonInteractivePending { .. } => false,
222			RoundFlowState::InteractivePending => true,
223			RoundFlowState::InteractiveOngoing { .. } => true,
224			RoundFlowState::Failed { .. } => false,
225			RoundFlowState::Canceled => false,
226			RoundFlowState::Finished { .. } => false,
227		}
228	}
229
230	/// Tries to cancel the round and returns whether it was succesfully canceled
231	/// or if it was already canceled or failed
232	pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
233		let ret = match self.flow {
234			RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
235			RoundFlowState::Canceled => true,
236			RoundFlowState::Failed { .. } => true,
237			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
238				self.flow = RoundFlowState::Canceled;
239				true
240			},
241			RoundFlowState::Finished { .. } => false,
242		};
243		if ret {
244			persist_round_failure(wallet, &self.participation, self.movement_id).await
245				.context("failed to persist round failure for cancelation")?;
246		}
247		Ok(ret)
248	}
249
250	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
251		match start_attempt(wallet, &self.participation, attempt).await {
252			Ok(state) => {
253				self.flow = RoundFlowState::InteractiveOngoing {
254					round_seq: attempt.round_seq,
255					attempt_seq: attempt.attempt_seq,
256					state: state,
257				};
258			},
259			Err(e) => {
260				self.flow = RoundFlowState::Failed {
261					error: format!("{:#}", e),
262				};
263			},
264		}
265	}
266
267	/// Processes the given event and returns true if some update was made to the state
268	pub async fn process_event(
269		&mut self,
270		wallet: &Wallet,
271		event: &RoundEvent,
272	) -> bool {
273		let _: Infallible = match self.flow {
274			RoundFlowState::InteractivePending => {
275				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
276					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
277					self.try_start_attempt(wallet, e).await;
278					return true;
279				} else {
280					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
281						event.kind(), event.round_seq(), event.attempt_seq(),
282					);
283					return false;
284				}
285			},
286			RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
287				// here we catch the cases where we're in a wrong flow
288
289				if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
290					warn!("Round {} failed by server", round_seq);
291					self.flow = RoundFlowState::Failed {
292						error: format!("round {} failed by server", round_seq),
293					};
294					return true;
295				}
296
297				if event.round_seq() > round_seq {
298					// new round started, we don't support multiple parallel rounds,
299					// this means we failed
300					self.flow = RoundFlowState::Failed {
301						error: format!("round {} started while we were on {}",
302							event.round_seq(), round_seq,
303						),
304					};
305					return true;
306				}
307
308				if event.attempt_seq() < attempt_seq {
309					trace!("ignoring replayed message from old attempt");
310					return false;
311				}
312
313				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
314					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
315					self.try_start_attempt(wallet, e).await;
316					return true;
317				}
318				trace!("Processing event {} for round attempt {}:{} in state {}",
319					event.kind(), round_seq, attempt_seq, state.kind(),
320				);
321
322				return match progress_attempt(state, wallet, &self.participation, event).await {
323					AttemptProgressResult::NotUpdated => false,
324					AttemptProgressResult::Updated { new_state } => {
325						*state = new_state;
326						true
327					},
328					AttemptProgressResult::Failed(e) => {
329						warn!("Round failed with error: {:#}", e);
330						self.flow = RoundFlowState::Failed {
331							error: format!("{:#}", e),
332						};
333						true
334					},
335					AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
336						self.new_vtxos = vtxos;
337						let funding_txid = funding_tx.compute_txid();
338						self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
339						if let Some(mid) = self.movement_id {
340							if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
341								warn!("Error updating the round funding txid: {:#}", e);
342							}
343						}
344						true
345					},
346				};
347			},
348			RoundFlowState::NonInteractivePending { .. }
349				| RoundFlowState::Finished { .. }
350				| RoundFlowState::Failed { .. }
351				| RoundFlowState::Canceled => return false,
352		};
353	}
354
355	/// Sync the round's status and return it
356	///
357	/// When success or failure is returned, the round state can be eliminated
358	//TODO(stevenroose) make RoundState manage its own db record
359	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
360		match self.flow {
361			RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
362				Ok(RoundStatus::Confirmed {
363					funding_txid: funding_tx.compute_txid(),
364				})
365			},
366
367			RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
368				Ok(RoundStatus::Pending)
369			},
370			RoundFlowState::Failed { ref error } => {
371				persist_round_failure(wallet, &self.participation, self.movement_id).await
372					.context("failed to persist round failure")?;
373				Ok(RoundStatus::Failed { error: error.clone() })
374			},
375			RoundFlowState::Canceled => {
376				persist_round_failure(wallet, &self.participation, self.movement_id).await
377					.context("failed to persist round failure")?;
378				Ok(RoundStatus::Canceled)
379			},
380
381			RoundFlowState::NonInteractivePending { unlock_hash } => {
382				match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
383					Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
384					Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
385						let funding_txid = funding_tx.compute_txid();
386						self.new_vtxos = new_vtxos;
387						self.flow = RoundFlowState::Finished {
388							funding_tx: funding_tx.clone(),
389							unlock_hash: unlock_hash,
390						};
391
392						persist_round_success(
393							wallet,
394							&self.participation,
395							self.movement_id,
396							&self.new_vtxos,
397							&funding_tx,
398						).await.context("failed to store successful round in DB!")?;
399
400						self.done = true;
401
402						Ok(RoundStatus::Confirmed { funding_txid })
403					},
404					Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
405						if let Some(mid) = self.movement_id {
406							update_funding_txid(wallet, mid, funding_txid).await
407								.context("failed to update funding txid in DB")?;
408						}
409						Ok(RoundStatus::Unconfirmed { funding_txid })
410					},
411
412					//TODO(stevenroose) should we mark as failed for these cases?
413
414					Err(HarkForfeitError::Err(e)) => {
415						//TODO(stevenroose) we failed here but we might actualy be able to
416						// succeed if we retry. should we implement some kind of limited
417						// retry after which we mark as failed?
418						Err(e.context("error progressing non-interactive round"))
419					},
420					Err(HarkForfeitError::SentForfeits(e)) => {
421						self.sent_forfeit_sigs = true;
422						Err(e.context("error progressing non-interactive round \
423							after sending forfeit tx signatures"))
424					},
425				}
426			},
427			// interactive part finished, but didn't forfeit yet
428			RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
429				let funding_txid = funding_tx.compute_txid();
430				let confirmed = check_funding_tx_confirmations(
431					wallet, funding_txid, &funding_tx,
432				).await.context("error checking funding tx confirmations")?;
433				if !confirmed {
434					trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
435					return Ok(RoundStatus::Unconfirmed { funding_txid });
436				}
437
438				match hark_vtxo_swap(
439					wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
440				).await {
441					Ok(()) => {
442						persist_round_success(
443							wallet,
444							&self.participation,
445							self.movement_id,
446							&self.new_vtxos,
447							&funding_tx,
448						).await.context("failed to store successful round in DB!")?;
449
450						self.done = true;
451
452						Ok(RoundStatus::Confirmed { funding_txid })
453					},
454					Err(HarkForfeitError::Err(e)) => {
455						Err(e.context("error forfeiting VTXOs after round"))
456					},
457					Err(HarkForfeitError::SentForfeits(e)) => {
458						self.sent_forfeit_sigs = true;
459						Err(e.context("error after having signed and sent \
460							forfeit signatures to server"))
461					},
462				}
463			},
464		}
465	}
466
467	/// Once we know the signed round funding tx, this returns the output VTXOs
468	/// for this round.
469	pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
470		if self.new_vtxos.is_empty() {
471			None
472		} else {
473			Some(&self.new_vtxos)
474		}
475	}
476
477	/// Returns the input VTXOs that are locked in this round, but only
478	/// if no output VTXOs were issued yet.
479	pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
480		//TODO(stevenroose) consider if we can't just drop the state after forfeit exchange
481		match self.flow {
482			RoundFlowState::NonInteractivePending { .. }
483				| RoundFlowState::InteractivePending
484				| RoundFlowState::InteractiveOngoing { .. }
485			=> {
486				&self.participation.inputs
487			},
488			RoundFlowState::Finished { .. } => if self.done {
489				// inputs already unlocked
490				&[]
491			} else {
492				&self.participation.inputs
493			},
494			RoundFlowState::Failed { .. }
495				| RoundFlowState::Canceled
496			=> {
497				// inputs already unlocked
498				&[]
499			},
500		}
501	}
502
503	/// The balance pending in this round
504	///
505	/// This becomes zero once the new round VTXOs are unlocked.
506	pub fn pending_balance(&self) -> Amount {
507		if self.done {
508			return Amount::ZERO;
509		}
510
511		match self.flow {
512			RoundFlowState::NonInteractivePending { .. }
513				| RoundFlowState::InteractivePending
514				| RoundFlowState::InteractiveOngoing { .. }
515				| RoundFlowState::Finished { .. }
516			=> {
517				self.participation.outputs.iter().map(|o| o.amount).sum()
518			},
519			RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
520				Amount::ZERO
521			},
522		}
523	}
524}
525
526/// The state of the process flow of a round
527///
528/// This tracks the progress of the interactive part of the round, from
529/// waiting to start until finishing either succesfully or with a failure.
530pub enum RoundFlowState {
531	/// We don't do flow and we just wait for the round to finish
532	NonInteractivePending {
533		unlock_hash: UnlockHash,
534	},
535
536	/// Waiting for round to happen
537	InteractivePending,
538	/// Interactive part ongoing
539	InteractiveOngoing {
540		round_seq: RoundSeq,
541		attempt_seq: usize,
542		state: AttemptState,
543	},
544
545	/// Interactive part finished, waiting for confirmation
546	Finished {
547		funding_tx: Transaction,
548		unlock_hash: UnlockHash,
549	},
550
551	/// Failed during round
552	Failed {
553		error: String,
554	},
555
556	/// User canceled round
557	Canceled,
558}
559
560/// The state of a single round attempt
561///
562/// For each attempt that we participate in, we keep the state of our concrete
563/// participation.
564pub enum AttemptState {
565	AwaitingAttempt,
566	AwaitingUnsignedVtxoTree {
567		cosign_keys: Vec<Keypair>,
568		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
569		unlock_hash: UnlockHash,
570	},
571	AwaitingFinishedRound {
572		unsigned_round_tx: Transaction,
573		vtxos_spec: VtxoTreeSpec,
574		unlock_hash: UnlockHash,
575	},
576}
577
578impl AttemptState {
579	/// The state kind represented as a string
580	fn kind(&self) -> &'static str {
581		match self {
582			Self::AwaitingAttempt => "AwaitingAttempt",
583			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
584			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
585		}
586	}
587}
588
589/// Result from trying to progress an ongoing round attempt
590enum AttemptProgressResult {
591	Finished {
592		funding_tx: Transaction,
593		vtxos: Vec<Vtxo<Full>>,
594		unlock_hash: UnlockHash,
595	},
596	Failed(anyhow::Error),
597	/// When the state changes, this variant is returned
598	///
599	/// If during the processing, we have signed any forfeit txs and tried
600	/// sending them to the server, the [UnconfirmedRound] instance is returned
601	/// so that it can be stored in the state.
602	Updated {
603		new_state: AttemptState,
604	},
605	NotUpdated,
606}
607
608/// Participate in the new round attempt by submitting our round participation
609async fn start_attempt(
610	wallet: &Wallet,
611	participation: &RoundParticipation,
612	event: &RoundAttempt,
613) -> anyhow::Result<AttemptState> {
614	let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
615
616	// Assign cosign pubkeys to the payment requests.
617	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
618		.take(participation.outputs.len())
619		.collect::<Vec<_>>();
620
621	// Prepare round participation info.
622	// For each of our requested vtxo output, we need a set of public and secret nonces.
623	let cosign_nonces = cosign_keys.iter()
624		.map(|key| {
625			let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
626			let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
627			for _ in 0..ark_info.nb_round_nonces {
628				let (s, p) = musig::nonce_pair(key);
629				secs.push(s);
630				pubs.push(p);
631			}
632			(secs, pubs)
633		})
634		.take(participation.outputs.len())
635		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
636
637
638	// The round has now started. We can submit our payment.
639	debug!("Submitting payment request with {} inputs and {} vtxo outputs",
640		participation.inputs.len(), participation.outputs.len(),
641	);
642
643	// Build signed requests with mailbox IDs
644	let unblinded_mailbox_id = wallet.mailbox_identifier();
645	let signed_reqs = participation.outputs.iter()
646		.zip(cosign_keys.iter())
647		.zip(cosign_nonces.iter())
648		.map(|((req, cosign_key), (_sec, pub_nonces))| {
649			SignedVtxoRequest {
650				vtxo: req.clone(),
651				cosign_pubkey: cosign_key.public_key(),
652				nonces: pub_nonces.clone(),
653			}
654		})
655		.collect::<Vec<_>>();
656
657	let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
658	for vtxo in participation.inputs.iter() {
659		let keypair = wallet.get_vtxo_key(vtxo).await
660			.map_err(HarkForfeitError::Err)?;
661		input_vtxos.push(protos::InputVtxo {
662			vtxo_id: vtxo.id().to_bytes().to_vec(),
663			attestation: {
664				let attestation = RoundAttemptAttestation::new(
665					event.challenge, vtxo.id(), &signed_reqs, &keypair,
666				);
667				attestation.serialize()
668			},
669		});
670	}
671
672	// Register VTXOs with server before round participation
673	wallet.register_vtxos_with_server(&participation.inputs).await
674		.map_err(HarkForfeitError::Err)?;
675
676	let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
677		input_vtxos: input_vtxos,
678		vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
679		#[allow(deprecated)]
680		offboard_requests: vec![],
681		unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
682	}).await.context("Ark server refused our payment submission")?;
683
684	Ok(AttemptState::AwaitingUnsignedVtxoTree {
685		unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
686		cosign_keys: cosign_keys,
687		secret_nonces: cosign_nonces.into_iter()
688			.map(|(sec, _pub)| sec.into_iter()
689				.map(DangerousSecretNonce::dangerous_from_secret_nonce)
690				.collect())
691			.collect(),
692	})
693}
694
695/// just an internal type; need Error trait to work with anyhow
696#[derive(Debug, thiserror::Error)]
697enum HarkForfeitError {
698	/// An error happened after we sent forfeit signatures to the server
699	#[error("error after forfeits were sent")]
700	SentForfeits(#[source] anyhow::Error),
701	/// An error happened before we sent forfeit signatures to the server
702	#[error("error before forfeits were sent")]
703	Err(#[source] anyhow::Error),
704}
705
706async fn hark_cosign_leaf(
707	wallet: &Wallet,
708	srv: &mut ServerConnection,
709	funding_tx: &Transaction,
710	vtxo: &mut Vtxo<Full>,
711) -> anyhow::Result<()> {
712	let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
713		.context("error fetching keypair").map_err(HarkForfeitError::Err)?
714		.with_context(|| format!(
715			"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
716		))?.1;
717	let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
718	let cosign_resp = srv.client.request_leaf_vtxo_cosign(
719		protos::LeafVtxoCosignRequest::from(cosign_req),
720	).await
721		.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
722		.into_inner().try_into()
723		.context("bad leaf vtxo cosign response")?;
724	ensure!(ctx.finalize(vtxo, cosign_resp),
725		"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
726	);
727
728	Ok(())
729}
730
731/// Finish the hArk VTXO swap protocol
732///
733/// This includes:
734/// - requesting cosignature of the locked hArk leaves
735/// - sending forfeit txs to the server in return for the unlock preimage
736///
737/// NB all the actions in this function are idempotent, meaning that the server
738/// allows them to be done multiple times. this means that if this function calls
739/// fails, it's safe to just call it again at a later time
740async fn hark_vtxo_swap(
741	wallet: &Wallet,
742	participation: &RoundParticipation,
743	output_vtxos: &mut [Vtxo<Full>],
744	funding_tx: &Transaction,
745	unlock_hash: UnlockHash,
746) -> Result<(), HarkForfeitError> {
747	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
748
749	// before we start make sure the server has our input vtxo signatures
750	wallet.register_vtxos_with_server(&participation.inputs).await
751		.context("couldn't send our input vtxos to server")
752		.map_err(HarkForfeitError::Err)?;
753
754	// first get the leaves signed
755	for vtxo in output_vtxos.iter_mut() {
756		hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
757			.map_err(HarkForfeitError::Err)?;
758	}
759
760	// then do the forfeit dance
761
762	let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
763		unlock_hash: unlock_hash.to_byte_array().to_vec(),
764		vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
765	}).await
766		.context("request forfeits nonces call failed")
767		.map_err(HarkForfeitError::Err)?
768		.into_inner().public_nonces.into_iter()
769		.map(|b| musig::PublicNonce::from_bytes(b))
770		.collect::<Result<Vec<_>, _>>()
771		.context("invalid forfeit nonces")
772		.map_err(HarkForfeitError::Err)?;
773
774	if server_nonces.len() != participation.inputs.len() {
775		return Err(HarkForfeitError::Err(anyhow!(
776			"server sent {} nonce pairs, expected {}",
777			server_nonces.len(), participation.inputs.len(),
778		)));
779	}
780
781	let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
782	for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
783		let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
784			.ok().flatten().with_context(|| format!(
785				"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
786			)).map_err(HarkForfeitError::Err)?.1;
787		forfeit_bundles.push(HashLockedForfeitBundle::new(
788			input, unlock_hash, &user_key, &nonces,
789		))
790	}
791
792	let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
793		forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
794	}).await
795		.context("forfeit vtxos call failed")
796		.map_err(HarkForfeitError::SentForfeits)?
797		.into_inner().unlock_preimage.as_slice().try_into()
798		.context("invalid preimage length")
799		.map_err(HarkForfeitError::SentForfeits)?;
800
801	for vtxo in output_vtxos.iter_mut() {
802		if !vtxo.provide_unlock_preimage(preimage) {
803			return Err(HarkForfeitError::SentForfeits(anyhow!(
804				"invalid preimage {} for vtxo {} with supposed unlock hash {}",
805				preimage.as_hex(), vtxo.id(), unlock_hash,
806			)));
807		}
808
809		// then validate the vtxo works
810		vtxo.validate(&funding_tx).with_context(|| format!(
811			"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
812		)).map_err(HarkForfeitError::SentForfeits)?;
813	}
814
815	Ok(())
816}
817
818fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
819	match vtxo.validate(funding_tx) {
820		Err(VtxoValidationError::GenesisTransition {
821			genesis_idx, genesis_len, transition_kind, ..
822		}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
823		Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
824			vtxo.serialize_hex(),
825		)),
826		Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
827	}
828}
829
830fn check_round_matches_participation(
831	part: &RoundParticipation,
832	new_vtxos: &[Vtxo<Full>],
833	funding_tx: &Transaction,
834) -> anyhow::Result<()> {
835	ensure!(new_vtxos.len() == part.outputs.len(),
836		"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
837	);
838
839	for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
840		ensure!(vtxo.amount() == req.amount,
841			"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
842		);
843		ensure!(*vtxo.policy() == req.policy,
844			"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
845		);
846
847		// We accept the VTXO if only the hArk transition (last) failure happens
848		check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
849	}
850
851	Ok(())
852}
853
854/// Check the confirmation status of a funding tx
855///
856/// Returns true if the funding tx is confirmed deeply enough for us to accept it.
857/// The required number of confirmations depends on the wallet's configuration.
858///
859/// Returns false if the funding tx seems valid but not confirmed yet.
860///
861/// Returns an error if the chain source fails or if we can't submit the tx to the
862/// mempool, suggesting it might be double spent.
863async fn check_funding_tx_confirmations(
864	wallet: &Wallet,
865	funding_txid: Txid,
866	funding_tx: &Transaction,
867) -> anyhow::Result<bool> {
868	let tip = wallet.chain.tip().await.context("chain source error")?;
869	let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
870	let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
871	trace!("Round funding tx {} confirmation status: {:?} (tip={})",
872		funding_txid, tx_status, tip,
873	);
874	match tx_status {
875		TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
876		TxStatus::Mempool | TxStatus::Confirmed(_) => {
877			if wallet.config.round_tx_required_confirmations == 0 {
878				debug!("Accepting round funding tx without confirmations because of configuration");
879				Ok(true)
880			} else {
881				trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
882				Ok(false)
883			}
884		},
885		TxStatus::NotFound => {
886			// let's try to submit it to our mempool
887			//TODO(stevenroose) change this to an explicit "testmempoolaccept" so that we can
888			// reliably distinguish the cases of our chain source having issues and the tx
889			// actually being rejected which suggests the round was double-spent
890			if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
891				Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
892					funding_txid, serialize_hex(funding_tx), e,
893				))
894			} else {
895				trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
896				Ok(false)
897			}
898		},
899	}
900}
901
902enum HarkProgressResult {
903	RoundPending,
904	FundingTxUnconfirmed {
905		funding_txid: Txid,
906	},
907	Ok {
908		funding_tx: Transaction,
909		new_vtxos: Vec<Vtxo<Full>>,
910	},
911}
912
913async fn progress_non_interactive(
914	wallet: &Wallet,
915	participation: &RoundParticipation,
916	unlock_hash: UnlockHash,
917) -> Result<HarkProgressResult, HarkForfeitError> {
918	let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
919
920	let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
921		unlock_hash: unlock_hash.to_byte_array().to_vec(),
922	}).await
923		.context("error checking round participation status")
924		.map_err(HarkForfeitError::Err)?.into_inner();
925	let status = protos::RoundParticipationStatus::try_from(resp.status)
926		.context("unknown status from server")
927		.map_err(HarkForfeitError::Err)	?;
928
929	if status == protos::RoundParticipationStatus::RoundPartPending {
930		trace!("Hark round still pending");
931		return Ok(HarkProgressResult::RoundPending);
932	}
933
934	// Since we got here, we clearly don't think we're finished.
935	// So even if the server thinks we did the dance before, we need the
936	// cosignature on the leaf tx so we need to do the dance again.
937	// "Guilty feet have got no rhythm."
938	if status == protos::RoundParticipationStatus::RoundPartReleased {
939		let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
940		warn!("Server says preimage was already released for hArk participation \
941			with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
942		);
943	}
944
945	let funding_tx_bytes = resp.round_funding_tx
946		.context("funding txid should be provided when status is not pending")
947		.map_err(HarkForfeitError::Err)?;
948	let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
949		.context("invalid funding txid")
950		.map_err(HarkForfeitError::Err)?;
951	let funding_txid = funding_tx.compute_txid();
952	trace!("Funding tx for round participation with unlock hash {}: {} ({})",
953		unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
954	);
955
956	// Check the confirmation status of the funding tx
957	match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
958		Ok(true) => {},
959		Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
960		Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
961	}
962
963	let mut new_vtxos = resp.output_vtxos.into_iter()
964		.map(|v| <Vtxo<Full>>::deserialize(&v))
965		.collect::<Result<Vec<_>, _>>()
966		.context("invalid output VTXOs from server")
967		.map_err(HarkForfeitError::Err)?;
968
969	// Check that the vtxos match our participation in the exact order
970	check_round_matches_participation(participation, &new_vtxos, &funding_tx)
971		.context("new VTXOs received from server don't match our participation")
972		.map_err(HarkForfeitError::Err)?;
973
974	hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
975		.context("error forfeiting hArk VTXOs")
976		.map_err(HarkForfeitError::SentForfeits)?;
977
978	Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
979}
980
981async fn progress_attempt(
982	state: &AttemptState,
983	wallet: &Wallet,
984	part: &RoundParticipation,
985	event: &RoundEvent,
986) -> AttemptProgressResult {
987	// we will match only the states and messages required to make progress,
988	// all else we ignore, except an unexpected finish
989
990	match (state, event) {
991
992		(
993			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
994			RoundEvent::VtxoProposal(e),
995		) => {
996			trace!("Received VtxoProposal: {:#?}", e);
997			match sign_vtxo_tree(
998				wallet,
999				part,
1000				&cosign_keys,
1001				&secret_nonces,
1002				&e.unsigned_round_tx,
1003				&e.vtxos_spec,
1004				&e.cosign_agg_nonces,
1005			).await {
1006				Ok(()) => {
1007					AttemptProgressResult::Updated {
1008						new_state: AttemptState::AwaitingFinishedRound {
1009							unsigned_round_tx: e.unsigned_round_tx.clone(),
1010							vtxos_spec: e.vtxos_spec.clone(),
1011							unlock_hash: *unlock_hash,
1012						},
1013					}
1014				},
1015				Err(e) => {
1016					trace!("Error signing VTXO tree: {:#}", e);
1017					AttemptProgressResult::Failed(e)
1018				},
1019			}
1020		},
1021
1022		(
1023			AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1024			RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1025		) => {
1026			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1027				return AttemptProgressResult::Failed(anyhow!(
1028					"signed funding tx ({}) doesn't match tx received before ({})",
1029					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1030				));
1031			}
1032
1033			if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1034				warn!("Failed to broadcast signed round tx: {:#}", e);
1035			}
1036
1037			match construct_new_vtxos(
1038				part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1039			).await {
1040				Ok(v) => AttemptProgressResult::Finished {
1041					funding_tx: signed_round_tx.clone(),
1042					vtxos: v,
1043					unlock_hash: *unlock_hash,
1044				},
1045				Err(e) => AttemptProgressResult::Failed(anyhow!(
1046					"failed to construct new VTXOs for round: {:#}", e,
1047				)),
1048			}
1049		},
1050
1051		(state, RoundEvent::Finished(RoundFinished { .. })) => {
1052			AttemptProgressResult::Failed(anyhow!(
1053				"unexpectedly received a finished round while we were in state {}",
1054				state.kind(),
1055			))
1056		},
1057
1058		(state, _) => {
1059			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1060			AttemptProgressResult::NotUpdated
1061		},
1062	}
1063}
1064
1065async fn sign_vtxo_tree(
1066	wallet: &Wallet,
1067	participation: &RoundParticipation,
1068	cosign_keys: &[Keypair],
1069	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1070	unsigned_round_tx: &Transaction,
1071	vtxo_tree: &VtxoTreeSpec,
1072	cosign_agg_nonces: &[musig::AggregatedNonce],
1073) -> anyhow::Result<()> {
1074	let (srv, _) = wallet.require_server().await.context("server not available")?;
1075
1076	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1077
1078	// Check that the proposal contains our inputs.
1079	let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1080	for vtxo_req in vtxo_tree.iter_vtxos() {
1081		if let Some(i) = my_vtxos.iter().position(|v| {
1082			v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1083		}) {
1084			my_vtxos.swap_remove(i);
1085		}
1086	}
1087	if !my_vtxos.is_empty() {
1088		bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1089	}
1090
1091	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1092	let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1093	trace!("Sending vtxo signatures to server...");
1094	let _ = try_join_all(iter.map(|((req, key), sec)| async {
1095		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1096		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1097		let part_sigs = unsigned_vtxos.cosign_branch(
1098			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
1099		).context("failed to cosign branch: our request not part of tree")?;
1100
1101		info!("Sending {} partial vtxo cosign signatures for pk {}",
1102			part_sigs.len(), key.public_key(),
1103		);
1104
1105		let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1106			pubkey: key.public_key().serialize().to_vec(),
1107			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1108		}).await.context("error sending vtxo signatures")?;
1109
1110		Result::<(), anyhow::Error>::Ok(())
1111	})).await.context("error sending VTXO signatures")?;
1112	trace!("Done sending vtxo signatures to server");
1113
1114	Ok(())
1115}
1116
1117async fn construct_new_vtxos(
1118	participation: &RoundParticipation,
1119	unsigned_round_tx: &Transaction,
1120	vtxo_tree: &VtxoTreeSpec,
1121	vtxo_cosign_sigs: &[schnorr::Signature],
1122) -> anyhow::Result<Vec<Vtxo<Full>>> {
1123	let round_txid = unsigned_round_tx.compute_txid();
1124	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1125	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126
1127	// Validate the vtxo tree and cosign signatures.
1128	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1129		// bad server!
1130		bail!("Received incorrect vtxo cosign signatures from server");
1131	}
1132
1133	let signed_vtxos = vtxo_tree
1134		.into_signed_tree(vtxo_cosign_sigs.to_vec())
1135		.into_cached_tree();
1136
1137	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1138	let total_nb_expected_vtxos = expected_vtxos.len();
1139
1140	let mut new_vtxos = vec![];
1141	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1142		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1143			let vtxo = signed_vtxos.build_vtxo(idx);
1144
1145			// validate the received vtxos
1146			// This is more like a sanity check since we crafted them ourselves.
1147			check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1148				.context("constructed invalid vtxo from tree")?;
1149
1150			info!("New VTXO from round: {} ({}, {})",
1151				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1152			);
1153
1154			new_vtxos.push(vtxo);
1155			expected_vtxos.swap_remove(expected_idx);
1156		}
1157	}
1158
1159	if !expected_vtxos.is_empty() {
1160		if expected_vtxos.len() == total_nb_expected_vtxos {
1161			// we must have done something wrong
1162			bail!("None of our VTXOs were present in round!");
1163		} else {
1164			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1165				expected_vtxos.len(), expected_vtxos,
1166			);
1167		}
1168	}
1169	Ok(new_vtxos)
1170}
1171
1172//TODO(stevenroose) should be made idempotent
1173async fn persist_round_success(
1174	wallet: &Wallet,
1175	participation: &RoundParticipation,
1176	movement_id: Option<MovementId>,
1177	new_vtxos: &[Vtxo<Full>],
1178	funding_tx: &Transaction,
1179) -> anyhow::Result<()> {
1180	debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1181		new_vtxos.len(), movement_id,
1182	);
1183
1184	// we first try all actions that need to happen and only afterwards return errors
1185	// so that we achieve maximum success
1186
1187	let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1188		.context("failed to store new VTXOs");
1189	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1190		.context("failed to mark input VTXOs as spent");
1191	let update_result = if let Some(mid) = movement_id {
1192		wallet.movements.finish_movement_with_update(
1193			mid,
1194			MovementStatus::Successful,
1195			MovementUpdate::new()
1196				.produced_vtxos(new_vtxos)
1197				.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1198		).await.context("failed to mark movement as finished")
1199	} else {
1200		Ok(())
1201	};
1202
1203	store_result?;
1204	spent_result?;
1205	update_result?;
1206
1207	Ok(())
1208}
1209
1210async fn persist_round_failure(
1211	wallet: &Wallet,
1212	participation: &RoundParticipation,
1213	movement_id: Option<MovementId>,
1214) -> anyhow::Result<()> {
1215	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1216	let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1217	let finish_result = if let Some(movement_id) = movement_id {
1218		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1219	} else {
1220		Ok(())
1221	};
1222	if let Err(e) = &finish_result {
1223		error!("Failed to mark movement as failed: {:#}", e);
1224	}
1225	match (unlock_result, finish_result) {
1226		(Ok(()), Ok(())) => Ok(()),
1227		(Err(e), _) => Err(e),
1228		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1229	}
1230}
1231
1232async fn update_funding_txid(
1233	wallet: &Wallet,
1234	movement_id: MovementId,
1235	funding_txid: Txid,
1236) -> anyhow::Result<()> {
1237	wallet.movements.update_movement(
1238		movement_id,
1239		MovementUpdate::new()
1240			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1241	).await.context("Unable to update funding txid of round")
1242}
1243
1244impl Wallet {
1245	/// Ask the server when the next round is scheduled to start
1246	pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1247		let (mut srv, _) = self.require_server().await?;
1248		let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1249		Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1250	}
1251
1252	/// Start a new round participation
1253	///
1254	/// This function will store the state in the db and mark the VTXOs as locked.
1255	///
1256	/// ### Return
1257	///
1258	/// - By default, the returned state will be locked to prevent race conditions.
1259	/// To unlock the state, [StoredRoundState::unlock()] can be called.
1260	pub async fn join_next_round(
1261		&self,
1262		participation: RoundParticipation,
1263		movement_kind: Option<RoundMovement>,
1264	) -> anyhow::Result<StoredRoundState> {
1265		let movement_id = if let Some(kind) = movement_kind {
1266			Some(self.movements.new_movement_with_update(
1267				Subsystem::ROUND,
1268				kind.to_string(),
1269				participation.to_movement_update()?
1270			).await?)
1271		} else {
1272			None
1273		};
1274		let state = RoundState::new_interactive(participation, movement_id);
1275
1276		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1277		let state = self.lock_wait_round_state(id).await?
1278			.context("failed to lock fresh round state")?;
1279
1280		Ok(state)
1281	}
1282
1283	/// Join next round in non-interactive or delegated mode
1284	pub async fn join_next_round_delegated(
1285		&self,
1286		participation: RoundParticipation,
1287		movement_kind: Option<RoundMovement>,
1288	) -> anyhow::Result<StoredRoundState<Unlocked>> {
1289		let (mut srv, _) = self.require_server().await?;
1290
1291		let movement_id = if let Some(kind) = movement_kind {
1292			Some(self.movements.new_movement_with_update(
1293				Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1294			).await?)
1295		} else {
1296			None
1297		};
1298
1299		// Get mailbox identifier for VTXO delivery
1300		let unblinded_mailbox_id = self.mailbox_identifier();
1301
1302		// Generate attestations for input vtxos
1303		let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1304		for vtxo in participation.inputs.iter() {
1305			let keypair = self.get_vtxo_key(vtxo).await
1306				.context("failed to get vtxo keypair")?;
1307			input_vtxos.push(protos::InputVtxo {
1308				vtxo_id: vtxo.id().to_bytes().to_vec(),
1309				attestation: {
1310					let attestation = DelegatedRoundParticipationAttestation::new(
1311						vtxo.id(), &participation.outputs, &keypair,
1312					);
1313					attestation.serialize()
1314				},
1315			});
1316		}
1317
1318		// Build proto VtxoRequests
1319		let vtxo_requests = participation.outputs.iter()
1320			.map(|req|
1321				protos::VtxoRequest {
1322					policy: req.policy.serialize(),
1323					amount: req.amount.to_sat(),
1324			})
1325			.collect::<Vec<_>>();
1326
1327		// Submit participation to server and get unlock_hash
1328		let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1329			input_vtxos,
1330			vtxo_requests,
1331			unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
1332		}).await.context("error submitting round participation to server")?.into_inner();
1333
1334		let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1335			.context("invalid unlock hash from server")?;
1336
1337		let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1338
1339		info!("Non-interactive round participation submitted, it will automatically execute \
1340			when you next sync your wallet after the round happened \
1341			(and has sufficient confirmations).",
1342		);
1343
1344		let id = self.db.store_round_state_lock_vtxos(&state).await?;
1345		Ok(StoredRoundState::new(id, state))
1346	}
1347
1348	/// Get all pending round states
1349	pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1350		self.db.get_pending_round_state_ids().await
1351	}
1352
1353	/// Get all pending round states
1354	pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1355		let ids = self.db.get_pending_round_state_ids().await?;
1356		let mut states = Vec::with_capacity(ids.len());
1357		for id in ids {
1358			if let Some(state) = self.db.get_round_state_by_id(id).await? {
1359				states.push(state);
1360			}
1361		}
1362		Ok(states)
1363	}
1364
1365	/// Balance locked in pending rounds
1366	pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1367		let mut ret = Amount::ZERO;
1368		for round in self.pending_round_states().await? {
1369			ret += round.state().pending_balance();
1370		}
1371		Ok(ret)
1372	}
1373
1374	/// Returns all VTXOs that are locked in a pending round
1375	///
1376	/// This excludes all input VTXOs for which the output VTXOs have already
1377	/// been created.
1378	pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1379		let mut ret = Vec::new();
1380		for round in self.pending_round_states().await? {
1381			let inputs = round.state().locked_pending_inputs();
1382			ret.reserve(inputs.len());
1383			for input in inputs {
1384				let v = self.get_vtxo_by_id(input.id()).await
1385					.context("unknown round input VTXO")?;
1386				ret.push(v);
1387			}
1388		}
1389		Ok(ret)
1390	}
1391
1392	/// Sync pending rounds that have finished but are waiting for confirmations
1393	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1394		let states = self.pending_round_states().await?;
1395		if states.is_empty() {
1396			return Ok(());
1397		}
1398
1399		debug!("Syncing {} pending round states...", states.len());
1400
1401		tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1402			// not processing events here
1403			if state.state().ongoing_participation() {
1404				return;
1405			}
1406
1407			let mut state = match self.lock_wait_round_state(state.id()).await {
1408				Ok(Some(state)) => state,
1409				Ok(None) => return,
1410				Err(e) => {
1411					warn!("Error locking round state: {:#}", e);
1412					return;
1413				},
1414			};
1415
1416			let status = state.state_mut().sync(self).await;
1417			trace!("Synced round #{}, status: {:?}", state.id(), status);
1418			match status {
1419				Ok(RoundStatus::Confirmed { funding_txid }) => {
1420					info!("Round confirmed. Funding tx {}", funding_txid);
1421					if let Err(e) = self.db.remove_round_state(&state).await {
1422						warn!("Error removing confirmed round state from db: {:#}", e);
1423					}
1424				},
1425				Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1426					info!("Waiting for confirmations for round funding tx {}", funding_txid);
1427					if let Err(e) = self.db.update_round_state(&state).await {
1428						warn!("Error updating pending round state in db: {:#}", e);
1429					}
1430				},
1431				Ok(RoundStatus::Pending) => {
1432					if let Err(e) = self.db.update_round_state(&state).await {
1433						warn!("Error updating pending round state in db: {:#}", e);
1434					}
1435				},
1436				Ok(RoundStatus::Failed { error }) => {
1437					error!("Round failed: {}", error);
1438					if let Err(e) = self.db.remove_round_state(&state).await {
1439						warn!("Error removing failed round state from db: {:#}", e);
1440					}
1441				},
1442				Ok(RoundStatus::Canceled) => {
1443					error!("Round canceled");
1444					if let Err(e) = self.db.remove_round_state(&state).await {
1445						warn!("Error removing canceled round state from db: {:#}", e);
1446					}
1447				},
1448				Err(e) => warn!("Error syncing round: {:#}", e),
1449			}
1450		}).await;
1451
1452		Ok(())
1453	}
1454
1455	/// Fetch last round event from server
1456	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1457		let (mut srv, _) = self.require_server().await?;
1458		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1459		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1460	}
1461
1462	async fn inner_process_event(
1463		&self,
1464		state: &mut StoredRoundState,
1465		event: Option<&RoundEvent>,
1466	) {
1467		if let Some(event) = event && state.state().ongoing_participation() {
1468			let updated = state.state_mut().process_event(self, &event).await;
1469			if updated {
1470				if let Err(e) = self.db.update_round_state(&state).await {
1471					error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1472				}
1473			}
1474		}
1475
1476		match state.state_mut().sync(self).await {
1477			Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1478			Ok(s) if s.is_final() => {
1479				info!("Round #{} finished with result: {:?}", state.id(), s);
1480				if let Err(e) = self.db.remove_round_state(&state).await {
1481					warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1482				}
1483			},
1484			Ok(s) => {
1485				trace!("Round state #{} is now in state {:?}", state.id(), s);
1486				if let Err(e) = self.db.update_round_state(&state).await {
1487					warn!("Error storing round state #{}: {:#}", state.id(), e);
1488				}
1489			},
1490		}
1491	}
1492
1493	/// Try to make incremental progress on all pending round states
1494	///
1495	/// If the `last_round_event` argument is not provided, it will be fetched
1496	/// from the server.
1497	pub async fn progress_pending_rounds(
1498		&self,
1499		last_round_event: Option<&RoundEvent>,
1500	) -> anyhow::Result<()> {
1501		let states = self.pending_round_states().await?;
1502		if states.is_empty() {
1503			return Ok(());
1504		}
1505
1506		info!("Processing {} rounds...", states.len());
1507
1508		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1509
1510		let has_ongoing_participation = states.iter()
1511			.any(|s| s.state().ongoing_participation());
1512		if has_ongoing_participation && last_round_event.is_none() {
1513			match self.get_last_round_event().await {
1514				Ok(e) => last_round_event = Some(Cow::Owned(e)),
1515				Err(e) => {
1516					warn!("Error fetching round event, \
1517						failed to progress ongoing rounds: {:#}", e);
1518				},
1519			}
1520		}
1521
1522		let event = last_round_event.as_ref().map(|c| c.as_ref());
1523
1524		let futs = states.into_iter().map(async |state| {
1525			let locked = self.lock_wait_round_state(state.id()).await?;
1526			if let Some(mut locked) = locked {
1527				self.inner_process_event(&mut locked, event).await;
1528			}
1529			Ok::<_, anyhow::Error>(())
1530		});
1531
1532		futures::future::join_all(futs).await;
1533
1534		Ok(())
1535	}
1536
1537	pub async fn subscribe_round_events(&self)
1538		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1539	{
1540		let (mut srv, _) = self.require_server().await?;
1541		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1542			.into_inner().map(|m| {
1543				let m = m.context("received error on event stream")?;
1544				let e = RoundEvent::try_from(m.clone())
1545					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1546				trace!("Received round event: {}", e);
1547				Ok::<_, anyhow::Error>(e)
1548			});
1549		Ok(events)
1550	}
1551
1552	/// A blocking call that will try to perform a full round participation
1553	/// for all ongoing rounds
1554	///
1555	/// Returns only once there is no ongoing rounds anymore.
1556	pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1557		let mut events = self.subscribe_round_events().await?;
1558
1559		loop {
1560			// NB: we need to load all ongoing rounds on every iteration here
1561			// because some might be finished by another call
1562			let state_ids = self.pending_round_states().await?.iter()
1563				.filter(|s| s.state().ongoing_participation())
1564				.map(|s| s.id())
1565				.collect::<Vec<_>>();
1566
1567			if state_ids.is_empty() {
1568				info!("All rounds handled");
1569				return Ok(());
1570			}
1571
1572			let event = events.next().await
1573				.context("events stream broke")?
1574				.context("error on event stream")?;
1575
1576			let futs = state_ids.into_iter().map(async |state| {
1577				let locked = self.lock_wait_round_state(state).await?;
1578				if let Some(mut locked) = locked {
1579					self.inner_process_event(&mut locked, Some(&event)).await;
1580				}
1581				Ok::<_, anyhow::Error>(())
1582			});
1583
1584			futures::future::join_all(futs).await;
1585		}
1586	}
1587
1588	/// Will cancel all pending rounds that can safely be canceled
1589	///
1590	/// All rounds that have not started yet can safely be canceled,
1591	/// as well as rounds where we have not yet signed any forfeit txs.
1592	pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1593		// initial load to get all pending round states ids
1594		let state_ids = self.db.get_pending_round_state_ids().await?;
1595
1596		let futures = state_ids.into_iter().map(|state_id| {
1597			async move {
1598				// wait for lock and load again to ensure most recent state
1599				let mut state = match self.lock_wait_round_state(state_id).await {
1600					Ok(Some(s)) => s,
1601					Ok(None) => return,
1602					Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1603				};
1604
1605				match state.state_mut().try_cancel(self).await {
1606					Ok(true) => {
1607						if let Err(e) = self.db.remove_round_state(&state).await {
1608							warn!("Error removing canceled round state from db: {:#}", e);
1609						}
1610					},
1611					Ok(false) => {},
1612					Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1613				}
1614			}
1615		});
1616
1617		join_all(futures).await;
1618
1619		Ok(())
1620	}
1621
1622	/// Try to cancel the given round
1623	pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1624		let mut state = self.lock_wait_round_state(id).await?
1625			.context("round state not found")?;
1626
1627		if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1628			self.db.remove_round_state(&state).await
1629				.context("error removing canceled round state from db")?;
1630		} else {
1631			bail!("failed to cancel round");
1632		}
1633
1634		Ok(())
1635	}
1636
1637	/// Participate in a round
1638	///
1639	/// This function will start a new round participation and block until
1640	/// the round is finished.
1641	/// After this method returns the round state will be kept active until
1642	/// the round tx fully confirms.
1643	pub(crate) async fn participate_round(
1644		&self,
1645		participation: RoundParticipation,
1646		movement_kind: Option<RoundMovement>,
1647	) -> anyhow::Result<RoundStatus> {
1648		let mut state = self.join_next_round(participation, movement_kind).await?;
1649
1650		info!("Waiting for a round start...");
1651		let mut events = self.subscribe_round_events().await?;
1652
1653		loop {
1654			if !state.state().ongoing_participation() {
1655				let status = state.state_mut().sync(self).await?;
1656				match status {
1657					RoundStatus::Failed { error } => bail!("round failed: {}", error),
1658					RoundStatus::Canceled => bail!("round canceled"),
1659					status => return Ok(status),
1660				}
1661			}
1662
1663			let event = events.next().await
1664				.context("events stream broke")?
1665				.context("error on event stream")?;
1666			if state.state_mut().process_event(self, &event).await {
1667				self.db.update_round_state(&state).await?;
1668			}
1669		}
1670	}
1671}