bark/
round.rs

1//! Round State Machine
2//!
3//!
4
5use std::{cmp, iter};
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use std::collections::{HashMap, HashSet};
11
12use anyhow::Context;
13use bdk_esplora::esplora_client::Amount;
14use bip39::rand;
15use bitcoin::consensus::encode::{serialize_hex, deserialize};
16use bitcoin::key::Keypair;
17use bitcoin::secp256k1::{schnorr, PublicKey};
18use bitcoin::{Address, Network, OutPoint, Transaction, Txid};
19use bitcoin::consensus::Params;
20use bitcoin::hashes::Hash;
21use futures::future::try_join_all;
22use futures::{Stream, StreamExt};
23use log::{debug, error, info, trace, warn};
24
25use ark::{OffboardRequest, ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoId, VtxoRequest};
26use ark::connectors::ConnectorChain;
27use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
28use ark::rounds::{
29	RoundAttempt, RoundEvent, RoundFinished, RoundId, RoundSeq, MIN_ROUND_TX_OUTPUTS, ROUND_TX_CONNECTOR_VOUT, ROUND_TX_VTXO_TREE_VOUT
30};
31use ark::tree::signed::{SignedVtxoTreeSpec, VtxoTreeSpec};
32use bitcoin_ext::{TxStatus, P2TR_DUST};
33use bitcoin_ext::rpc::RpcApi;
34use server_rpc::protos;
35
36use crate::{SECP, Wallet};
37use crate::movement::{MovementDestination, MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::onchain::{ChainSource, ChainSourceClient};
40use crate::persist::StoredRoundState;
41use crate::subsystem::{BarkSubsystem, RoundMovement};
42
43/// Bitcoin's block time of 10 minutes.
44const BLOCK_TIME: Duration = Duration::from_secs(10 * 60);
45
46
47/// Struct to communicate your specific participation for an Ark round.
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RoundParticipation {
50	#[serde(with = "ark::encode::serde::vec")]
51	pub inputs: Vec<Vtxo>,
52	/// The output VTXOs that we request in the round,
53	/// including change
54	pub outputs: Vec<VtxoRequest>,
55	pub offboards: Vec<OffboardRequest>,
56}
57
58impl RoundParticipation {
59	pub fn to_movement_update(&self, network: Network) -> anyhow::Result<MovementUpdate> {
60		let params = Params::from(network);
61		let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
62		let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
63		let offboard_amount = self.offboards.iter().map(|r| r.amount).sum::<Amount>();
64		let fee = input_amount - output_amount - offboard_amount;
65		let intended = -offboard_amount.to_signed()?;
66		let mut sent_to = Vec::with_capacity(self.offboards.len());
67		for o in &self.offboards {
68			let address = Address::from_script(&o.script_pubkey, &params)?;
69			sent_to.push(MovementDestination::new(address.to_string(), o.amount));
70		}
71		Ok(MovementUpdate::new()
72			.consumed_vtxos(&self.inputs)
73			.intended_balance(intended)
74			.effective_balance(intended - fee.to_signed()?)
75			.fee(fee)
76			.sent_to(sent_to)
77		)
78	}
79}
80
81#[derive(Debug, Clone)]
82pub enum RoundStatus {
83	/// The round was successful and is fully confirmed
84	Confirmed {
85		funding_txid: Txid,
86	},
87	/// Round successful but not fully confirmed
88	Unconfirmed {
89		funding_txid: Txid,
90	},
91	/// We have unsigned funding transactions that might confirm
92	Pending {
93		unsigned_funding_txids: Vec<Txid>,
94	},
95	/// The round failed
96	Failed {
97		error: String,
98	},
99}
100
101impl RoundStatus {
102	/// Whether this is the final state and it won't change anymore
103	pub fn is_final(&self) -> bool {
104		match self {
105			Self::Confirmed { .. } => true,
106			Self::Unconfirmed { .. } => false,
107			Self::Pending { .. } => false,
108			Self::Failed { .. } => true,
109		}
110	}
111
112	/// Whether it looks like the round succeeded
113	pub fn is_success(&self) -> bool {
114		match self {
115			Self::Confirmed { .. } => true,
116			Self::Unconfirmed { .. } => true,
117			Self::Pending { .. } => false,
118			Self::Failed { .. } => false,
119		}
120	}
121}
122
123/// State of the progress of a round participation
124///
125/// An instance of this struct is kept all the way from the intention of joining
126/// the next round, until either the round fully confirms or it fails and we are
127/// sure it won't have any effect on our wallet.
128///
129/// As soon as we have signed forfeit txs for the round, we keep track of this
130/// round attempt until we see another attempt we participated in confirm or
131/// we gain confidence that the failed attempt will never confirm.
132pub struct RoundState {
133	/// Our participation in this round
134	pub(crate) participation: RoundParticipation,
135
136	/// The flow of the round in case it is still ongoing with the server
137	pub(crate) flow: RoundFlowState,
138
139	/// A potential final state for each round-attempt
140	pub(crate) unconfirmed_rounds: Vec<UnconfirmedRound>,
141
142	/// The ID of the [Movement] associated with this round
143	pub(crate) movement_id: Option<MovementId>,
144}
145
146impl RoundState {
147	fn new(participation: RoundParticipation, movement_id: Option<MovementId>) -> Self {
148		Self {
149			participation,
150			movement_id,
151			flow: RoundFlowState::WaitingToStart,
152			unconfirmed_rounds: Vec::new(),
153		}
154	}
155
156	/// Our participation in this round
157	pub fn participation(&self) -> &RoundParticipation {
158		&self.participation
159	}
160
161	pub fn flow(&self) -> &RoundFlowState {
162		&self.flow
163	}
164
165	pub fn unconfirmed_rounds(&self) -> &[UnconfirmedRound] {
166		&self.unconfirmed_rounds
167	}
168
169	/// Whether the interactive part of the round has finished
170	pub fn round_has_finished(&self) -> bool {
171		match self.flow {
172			RoundFlowState::WaitingToStart => false,
173			RoundFlowState::Ongoing { .. } => false,
174			RoundFlowState::Success => true,
175			RoundFlowState::Failed { .. } => true,
176		}
177	}
178
179	async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
180		match start_attempt(wallet, &self.participation, attempt).await {
181			Ok(state) => {
182				self.flow = RoundFlowState::Ongoing {
183					round_seq: attempt.round_seq,
184					attempt_seq: attempt.attempt_seq,
185					state: state,
186				};
187			},
188			Err(e) => {
189				self.flow = RoundFlowState::Failed {
190					error: format!("{:#}", e),
191				};
192			},
193		}
194	}
195
196	/// Processes the given event and returns true if some update was made to the state
197	pub async fn process_event(
198		&mut self,
199		wallet: &Wallet,
200		event: &RoundEvent,
201	) -> bool {
202		let _: Infallible = match self.flow {
203			RoundFlowState::WaitingToStart => {
204				if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
205					trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
206					self.try_start_attempt(wallet, e).await;
207					return true;
208				} else {
209					trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
210						event.kind(), event.round_seq(), event.attempt_seq(),
211					);
212					return false;
213				}
214			},
215			RoundFlowState::Ongoing { round_seq, attempt_seq, ref mut state } => {
216				// here we catch the cases where we're in a wrong flow
217
218				if event.round_seq() > round_seq {
219					// new round started, we don't support multiple parallel rounds,
220					// this means we failed
221					self.flow = RoundFlowState::Failed {
222						error: format!("round {} started while we were on {}",
223							event.round_seq(), round_seq,
224						),
225					};
226					return true;
227				}
228
229				if event.attempt_seq() < attempt_seq {
230					trace!("ignoring replayed message from old attempt");
231					return false;
232				}
233
234				if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
235					trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
236					self.try_start_attempt(wallet, e).await;
237					return true;
238				}
239				trace!("Processing event {} for round attempt {}:{} in state {}",
240					event.kind(), round_seq, attempt_seq, state.kind(),
241				);
242
243				let mut updated = false;
244				match progress_attempt(state, wallet, &self.participation, event).await {
245					AttemptProgressResult::NotUpdated => {},
246					AttemptProgressResult::Updated { new_state, new_unconfirmed_round } => {
247						if let Some(r) = new_unconfirmed_round {
248							self.unconfirmed_rounds.push(r);
249						}
250						*state = new_state;
251						updated = true;
252					},
253					AttemptProgressResult::Failed(e) => {
254						self.flow = RoundFlowState::Failed { error: format!("{:#}", e) };
255						updated = true;
256					},
257					AttemptProgressResult::Finished { signed_round_tx, vtxos } => {
258						assert!(!self.unconfirmed_rounds.is_empty());
259
260						// we need to update our UnconfirmedRound with the signed tx
261						let txid = signed_round_tx.compute_txid();
262						if let Some(round) = self.unconfirmed_rounds.iter_mut()
263							.find(|r| r.funding_txid() == txid)
264						{
265							round.funding_tx = signed_round_tx;
266
267							if let Err(e) = persist_round_success(
268								wallet,
269								&self.participation,
270								self.movement_id,
271								&vtxos,
272								&round.funding_tx,
273							).await {
274								error!("Error while storing succesful round: {:#}", e);
275								//TODO(stevenroose) make sure we call this again timely!
276							}
277
278							self.flow = RoundFlowState::Success;
279						} else {
280							self.flow = RoundFlowState::Failed {
281								error: format!("server sent signed round tx {}, \
282									but we don't have a state for that", txid,),
283							};
284						};
285						updated = true;
286					},
287				}
288				return updated;
289			},
290			RoundFlowState::Success { .. } | RoundFlowState::Failed { .. } => return false,
291		};
292	}
293
294	/// Sync the round's status and return it
295	///
296	/// When success or failure is returned, the round state can be eliminated
297	pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
298		let mut confirmed_funding_txid = None;
299		let mut idx = 0;
300		while idx < self.unconfirmed_rounds.len() {
301			let round = self.unconfirmed_rounds.get_mut(idx).unwrap();
302
303			let was_signed = round.is_tx_signed();
304			let res = round.sync(wallet).await;
305
306			// if we just saw the signed tx, issue the new VTXOs and mark movement as OK
307			//TODO(stevenroose) after we make `persist_round_success` idempotent,
308			// just always go into this branch when we have a signed tx to make
309			// sure a db failure get fixed on the next call of `sync`.
310			if !was_signed && round.is_tx_signed() {
311				if let Err(e) = persist_round_success(
312					wallet,
313					&self.participation,
314					self.movement_id,
315					&round.new_vtxos,
316					&round.funding_tx,
317				).await {
318					error!("Error storing state after seeing signed funding tx: {:#?}", e);
319					idx += 1;
320					continue;
321				}
322			}
323
324			//TODO(stevenroose) after the persist methods are idempotent, also
325			// persist the round as failed here if a signed round tx is no longer in
326			// the mempool
327
328			let _: Infallible = match res {
329				Ok(UnconfirmedRoundStatus::Confirmed) => {
330					confirmed_funding_txid = Some(round.funding_txid());
331					// let's not remove this one just to be sure we can't
332					// accidentally lose track of it
333					// we should remove the entire state after this anyway
334					idx += 1;
335					continue;
336				},
337				Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
338					debug!("Round with round txid {} got double spent by tx {:?}",
339						round.funding_tx.compute_txid(), double_spender,
340					);
341					self.unconfirmed_rounds.swap_remove(idx);
342					continue; // skip idx increment
343				},
344				Ok(UnconfirmedRoundStatus::Unconfirmed) => {
345					idx += 1;
346					continue;
347				},
348				Err(e) => {
349					warn!("Error syncing status of unconfirmed round: {:#}", e);
350					trace!("Error syncing status of unconfirmed round: err={:#}; state={:?}",
351						e, round,
352					);
353					idx += 1;
354					continue;
355				}
356			};
357		}
358
359		let status = if let Some(funding_txid) = confirmed_funding_txid {
360			if let Some(movement_id) = self.movement_id {
361				update_funding_txid(funding_txid, movement_id, wallet).await?;
362				wallet.movements.finish_movement(movement_id, MovementStatus::Finished).await?;
363			}
364
365			RoundStatus::Confirmed { funding_txid }
366		} else if self.unconfirmed_rounds.is_empty() {
367			match self.flow {
368				RoundFlowState::WaitingToStart | RoundFlowState::Ongoing { .. } => {
369					RoundStatus::Pending { unsigned_funding_txids: vec![] }
370				}
371				RoundFlowState::Success => {
372					persist_round_failure(wallet, &self.participation, self.movement_id)
373						.await
374						.context("failed to persist round failure")?;
375					RoundStatus::Failed {
376						error: "all pending round funding transactions have been double spent".into(),
377					}
378				},
379				RoundFlowState::Failed { ref error } => {
380					persist_round_failure(wallet, &self.participation, self.movement_id)
381						.await
382						.context("failed to persist round failure")?;
383					RoundStatus::Failed { error: error.clone() }
384				},
385			}
386		} else if let Some(signed) = self.unconfirmed_rounds.iter().find(|r| r.is_tx_signed()) {
387			let funding_txid = signed.funding_txid();
388			if let Some(movement_id) = self.movement_id {
389				update_funding_txid(funding_txid, movement_id, wallet).await?;
390			}
391
392			RoundStatus::Unconfirmed { funding_txid }
393		} else {
394			RoundStatus::Pending {
395				unsigned_funding_txids: self.unconfirmed_rounds.iter()
396					.map(|r| r.funding_txid())
397					.collect(),
398			}
399		};
400		Ok(status)
401	}
402
403	/// Once we know the signed round funding tx, this returns the output VTXOs
404	/// for this round.
405	pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
406		for round in self.unconfirmed_rounds.iter() {
407			if round.is_tx_signed() {
408				return Some(&round.new_vtxos);
409			}
410		}
411		None
412	}
413
414	/// Returns the input VTXOs that are locked in this round, but only
415	/// if no output VTXOs were issued yet.
416	pub fn locked_pending_inputs(&self) -> &[Vtxo] {
417		if self.unconfirmed_rounds.iter().any(|r| r.is_tx_signed()) {
418			// new vtxos aready issued
419			return &[];
420		}
421
422		match self.flow {
423			RoundFlowState::WaitingToStart
424				| RoundFlowState::Ongoing { .. }
425				| RoundFlowState::Success =>
426			{
427				&self.participation.inputs
428			},
429			RoundFlowState::Failed { .. } => {
430				// inputs already unlocked
431				&[]
432			},
433		}
434	}
435}
436
437/// The state of the process flow of a round
438///
439/// This tracks the progress of the interactive part of the round, from
440/// waiting to start until finishing either succesfully or with a failure.
441pub enum RoundFlowState {
442	WaitingToStart,
443	Ongoing {
444		round_seq: RoundSeq,
445		attempt_seq: usize,
446		state: AttemptState,
447	},
448	Success,
449	Failed {
450		error: String,
451	},
452}
453
454/// The state of a single round attempt
455///
456/// For each attempt that we participate in, we keep the state of our concrete
457/// participation.
458pub enum AttemptState {
459	AwaitingAttempt,
460	AwaitingUnsignedVtxoTree {
461		cosign_keys: Vec<Keypair>,
462		secret_nonces: Vec<Vec<DangerousSecretNonce>>,
463	},
464	AwaitingRoundProposal {
465		unsigned_round_tx: Transaction,
466		vtxos_spec: VtxoTreeSpec,
467	},
468	AwaitingFinishedRound {
469		unsigned_round_tx: Transaction,
470		new_vtxos: Vec<Vtxo>,
471	},
472}
473
474impl AttemptState {
475	/// The state kind represented as a string
476	fn kind(&self) -> &'static str {
477		match self {
478			Self::AwaitingAttempt => "AwaitingAttempt",
479			Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
480			Self::AwaitingRoundProposal { .. } => "AwaitingRoundProposal",
481			Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
482		}
483	}
484}
485
486/// Result from trying to progress an ongoing round attempt
487enum AttemptProgressResult {
488	Finished {
489		signed_round_tx: Transaction,
490		vtxos: Vec<Vtxo>,
491	},
492	Failed(anyhow::Error),
493	/// When the state changes, this variant is returned
494	///
495	/// If during the processing, we have signed any forfeit txs and tried
496	/// sending them to the server, the [UnconfirmedRound] instance is returned
497	/// so that it can be stored in the state.
498	Updated {
499		new_state: AttemptState,
500		new_unconfirmed_round: Option<UnconfirmedRound>,
501	},
502	NotUpdated,
503}
504
505/// Participate in the new round attempt by submitting our round participation
506async fn start_attempt(
507	wallet: &Wallet,
508	participation: &RoundParticipation,
509	event: &RoundAttempt,
510) -> anyhow::Result<AttemptState> {
511	let mut srv = wallet.require_server().context("server not available")?;
512
513	// Assign cosign pubkeys to the payment requests.
514	let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
515		.take(participation.outputs.len())
516		.collect::<Vec<_>>();
517	let vtxo_reqs = participation.outputs.iter().zip(cosign_keys.iter()).map(|(p, ck)| {
518		SignedVtxoRequest { vtxo: p.clone(), cosign_pubkey: Some(ck.public_key()) }
519	}).collect::<Vec<_>>();
520
521	// Prepare round participation info.
522	// For each of our requested vtxo output, we need a set of public and secret nonces.
523	let cosign_nonces = cosign_keys.iter()
524		.map(|key| {
525			let mut secs = Vec::with_capacity(srv.info.nb_round_nonces);
526			let mut pubs = Vec::with_capacity(srv.info.nb_round_nonces);
527			for _ in 0..srv.info.nb_round_nonces {
528				let (s, p) = musig::nonce_pair(key);
529				secs.push(s);
530				pubs.push(p);
531			}
532			(secs, pubs)
533		})
534		.take(vtxo_reqs.len())
535		.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
536
537	// The round has now started. We can submit our payment.
538	debug!("Submitting payment request with {} inputs, {} vtxo outputs and {} offboard outputs",
539		participation.inputs.len(), vtxo_reqs.len(), participation.offboards.len(),
540	);
541
542	srv.client.submit_payment(protos::SubmitPaymentRequest {
543		input_vtxos: participation.inputs.iter().map(|vtxo| {
544			let keypair = wallet.get_vtxo_key(&vtxo)
545				.expect("owned vtxo key should be in database");
546
547			protos::InputVtxo {
548				vtxo_id: vtxo.id().to_bytes().to_vec(),
549				ownership_proof: {
550					let sig = event.challenge.sign_with(
551						vtxo.id(), &vtxo_reqs, &participation.offboards, keypair,
552					);
553					sig.serialize().to_vec()
554				},
555			}
556		}).collect(),
557		vtxo_requests: vtxo_reqs.iter().zip(cosign_nonces.iter()).map(|(r, n)| {
558			protos::SignedVtxoRequest {
559				vtxo: Some(protos::VtxoRequest {
560					amount: r.vtxo.amount.to_sat(),
561					policy: r.vtxo.policy.serialize(),
562				}),
563				cosign_pubkey: r.cosign_pubkey.expect("just set").serialize().to_vec(),
564				public_nonces: n.1.iter().map(|n| n.serialize().to_vec()).collect(),
565			}
566		}).collect(),
567		offboard_requests: participation.offboards.iter().map(|r| {
568			protos::OffboardRequest {
569				amount: r.amount.to_sat(),
570				offboard_spk: r.script_pubkey.to_bytes(),
571			}
572		}).collect(),
573	}).await.context("Ark server refused our payment submission")?;
574
575	Ok(AttemptState::AwaitingUnsignedVtxoTree {
576		cosign_keys: cosign_keys,
577		secret_nonces: cosign_nonces.into_iter()
578			.map(|(sec, _pub)| sec.into_iter().map(DangerousSecretNonce::new).collect())
579			.collect(),
580	})
581}
582
583async fn progress_attempt(
584	state: &AttemptState,
585	wallet: &Wallet,
586	part: &RoundParticipation,
587	event: &RoundEvent,
588) -> AttemptProgressResult {
589	// we will match only the states and messages required to make progress,
590	// all else we ignore, except an unexpected finish
591
592	match (state, event) {
593
594		(
595			AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces },
596			RoundEvent::VtxoProposal(e),
597		) => {
598			match sign_vtxo_tree(
599				wallet, part, &cosign_keys, &secret_nonces, &e.unsigned_round_tx, &e.vtxos_spec, &e.cosign_agg_nonces,
600			).await {
601				Ok(()) => {
602					AttemptProgressResult::Updated {
603						new_state: AttemptState::AwaitingRoundProposal {
604							unsigned_round_tx: e.unsigned_round_tx.clone(),
605							vtxos_spec: e.vtxos_spec.clone(),
606						},
607						new_unconfirmed_round: None,
608					}
609				},
610				Err(e) => AttemptProgressResult::Failed(e),
611			}
612		},
613
614		(
615			AttemptState::AwaitingRoundProposal { unsigned_round_tx, vtxos_spec },
616			RoundEvent::RoundProposal(e),
617		) => {
618			match sign_forfeits(
619				wallet, part, unsigned_round_tx, vtxos_spec, &e.cosign_sigs, &e.forfeit_nonces, e.connector_pubkey,
620			).await {
621				Ok((new_vtxos, forfeit_sigs)) => {
622					let round = UnconfirmedRound::new(unsigned_round_tx.clone(), new_vtxos.clone());
623					match submit_forfeit_sigs(wallet, forfeit_sigs).await {
624						Ok(()) => AttemptProgressResult::Updated {
625							new_state: AttemptState::AwaitingFinishedRound {
626								unsigned_round_tx: unsigned_round_tx.clone(),
627								new_vtxos: new_vtxos,
628							},
629							new_unconfirmed_round: Some(round),
630						},
631						Err(e) => {
632							warn!("Error sending forfeit sigs to server: {:#}", e);
633							AttemptProgressResult::Updated {
634								new_state: AttemptState::AwaitingAttempt,
635								new_unconfirmed_round: Some(round),
636							}
637						},
638					}
639				},
640				Err(e) => AttemptProgressResult::Failed(e),
641			}
642		},
643
644		(
645			AttemptState::AwaitingFinishedRound { unsigned_round_tx, new_vtxos },
646			RoundEvent::Finished(RoundFinished { signed_round_tx, .. }),
647		) => {
648			if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
649				return AttemptProgressResult::Failed(anyhow!(
650					"signed funding tx ({}) doesn't match tx received before ({})",
651					signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
652				));
653			}
654
655			AttemptProgressResult::Finished {
656				signed_round_tx: signed_round_tx.clone(),
657				vtxos: new_vtxos.clone(),
658			}
659		},
660
661		// unexpected finish
662		(state, RoundEvent::Finished(RoundFinished { .. })) => {
663			AttemptProgressResult::Failed(anyhow!(
664				"unexpectedly received a finished round while we were in state {}",
665				state.kind(),
666			))
667		},
668
669		(state, _) => {
670			trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
671			AttemptProgressResult::NotUpdated
672		},
673	}
674}
675
676async fn sign_vtxo_tree(
677	wallet: &Wallet,
678	participation: &RoundParticipation,
679	cosign_keys: &[Keypair],
680	secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
681	unsigned_round_tx: &Transaction,
682	vtxo_tree: &VtxoTreeSpec,
683	cosign_agg_nonces: &[musig::AggregatedNonce],
684) -> anyhow::Result<()> {
685	let srv = wallet.require_server().context("server not available")?;
686
687	if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
688		bail!("server sent round tx with less than 2 outputs: {}",
689			serialize_hex(&unsigned_round_tx),
690		);
691	}
692
693	let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
694
695	let my_vtxos = participation.outputs.iter().zip(cosign_keys.iter())
696		.map(|(r, k)| SignedVtxoRequest {
697			vtxo: r.clone(),
698			cosign_pubkey: Some(k.public_key()),
699		})
700		.collect::<Vec<_>>();
701
702	// Check that the proposal contains our inputs.
703	{
704		let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
705		for vtxo_req in vtxo_tree.iter_vtxos() {
706			if let Some(i) = my_vtxos.iter().position(|v| {
707				v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
708			}) {
709				my_vtxos.swap_remove(i);
710			}
711		}
712		if !my_vtxos.is_empty() {
713			bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
714		}
715
716		let mut my_offbs = participation.offboards.to_vec();
717		for offb in unsigned_round_tx.output.iter().skip(2) {
718			if let Some(i) = my_offbs.iter().position(|o| o.to_txout() == *offb) {
719				my_offbs.swap_remove(i);
720			}
721		}
722		if !my_offbs.is_empty() {
723			bail!("server didn't include all of our offboards, missing: {:?}", my_offbs);
724		}
725	}
726
727	// Make vtxo signatures from top to bottom, just like sighashes are returned.
728	let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
729	let iter = my_vtxos.iter().zip(cosign_keys).zip(secret_nonces);
730	let _ = try_join_all(iter.map(|((req, key), sec)| async {
731		let leaf_idx = unsigned_vtxos.spec.leaf_idx_of(req).expect("req included");
732		let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
733		let part_sigs = unsigned_vtxos.cosign_branch(
734			&cosign_agg_nonces, leaf_idx, key, secret_nonces,
735		).context("failed to cosign branch: our request not part of tree")?;
736
737		info!("Sending {} partial vtxo cosign signatures for pk {}",
738			part_sigs.len(), key.public_key(),
739		);
740
741		let _ = srv.clone().client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
742			pubkey: key.public_key().serialize().to_vec(),
743			signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
744		}).await.context("error sending vtxo signatures")?;
745		Result::<(), anyhow::Error>::Ok(())
746	})).await.context("error sending VTXO signatures")?;
747
748	Ok(())
749}
750
751/// Sign the forfeit signatures but doesn't submit them yet
752async fn sign_forfeits(
753	wallet: &Wallet,
754	participation: &RoundParticipation,
755	unsigned_round_tx: &Transaction,
756	vtxo_tree: &VtxoTreeSpec,
757	vtxo_cosign_sigs: &[schnorr::Signature],
758	forfeit_nonces: &HashMap<VtxoId, Vec<musig::PublicNonce>>,
759	connector_pubkey: PublicKey,
760) -> anyhow::Result<(Vec<Vtxo>, HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>)> {
761	let srv = wallet.require_server().context("server not available")?;
762
763	let round_txid = unsigned_round_tx.compute_txid();
764	let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
765	let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
766
767	// Validate the vtxo tree and cosign signatures.
768	if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
769		// bad server!
770		bail!("Received incorrect vtxo cosign signatures from server");
771	}
772
773	let signed_vtxos = vtxo_tree.into_signed_tree(vtxo_cosign_sigs.to_vec());
774
775	// Check that the connector key is correct.
776	let conn_txout = unsigned_round_tx.output.get(ROUND_TX_CONNECTOR_VOUT as usize)
777		.expect("checked before");
778	let expected_conn_txout = ConnectorChain::output(forfeit_nonces.len(), connector_pubkey);
779	if *conn_txout != expected_conn_txout {
780		bail!("round tx from server has unexpected connector output: {:?} (expected {:?})",
781			conn_txout, expected_conn_txout,
782		);
783	}
784
785	let conns_utxo = OutPoint::new(round_txid, ROUND_TX_CONNECTOR_VOUT);
786
787	// Make forfeit signatures.
788	let connectors = ConnectorChain::new(
789		forfeit_nonces.values().next().unwrap().len(),
790		conns_utxo,
791		connector_pubkey,
792	);
793
794	let forfeit_sigs = participation.inputs.iter().map(|vtxo| {
795		let keypair = wallet.get_vtxo_key(&vtxo)?;
796
797		let sigs = connectors.connectors().enumerate().map(|(i, (conn, _))| {
798			let (sighash, _tx) = ark::forfeit::connector_forfeit_sighash_exit(
799				vtxo, conn, connector_pubkey,
800			);
801			let srv_nonce = forfeit_nonces.get(&vtxo.id())
802				.with_context(|| format!("missing srv forfeit nonce for {}", vtxo.id()))?
803				.get(i)
804				.context("srv didn't provide enough forfeit nonces")?;
805
806			let (nonce, sig) = musig::deterministic_partial_sign(
807				&keypair,
808				[srv.info.server_pubkey],
809				&[srv_nonce],
810				sighash.to_byte_array(),
811				Some(vtxo.output_taproot().tap_tweak().to_byte_array()),
812			);
813			Ok((nonce, sig))
814		}).collect::<anyhow::Result<Vec<_>>>()?;
815
816		Ok((vtxo.id(), sigs))
817	})
818		.collect::<anyhow::Result<HashMap<_, _>>>()
819		.context("error signing forfeits")?;
820
821	let signed_vtxos = signed_vtxos.into_cached_tree();
822
823	let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
824	let total_nb_expected_vtxos = expected_vtxos.len();
825
826	let mut new_vtxos = vec![];
827	for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
828		if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
829			let vtxo = signed_vtxos.build_vtxo(idx).expect("correct leaf idx");
830
831			// validate the received vtxos
832			// This is more like a sanity check since we crafted them ourselves.
833			vtxo.validate(&unsigned_round_tx)
834				.context("constructed invalid vtxo from tree")?;
835
836			info!("New VTXO from round: {} ({}, {})",
837				vtxo.id(), vtxo.amount(), vtxo.policy_type(),
838			);
839
840			new_vtxos.push(vtxo);
841			expected_vtxos.swap_remove(expected_idx);
842		}
843	}
844
845	if !expected_vtxos.is_empty() {
846		if expected_vtxos.len() == total_nb_expected_vtxos {
847			// we must have done something wrong
848			bail!("None of our VTXOs were present in round!");
849		} else {
850			bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
851				expected_vtxos.len(), expected_vtxos,
852			);
853		}
854	}
855	Ok((new_vtxos, forfeit_sigs))
856}
857
858async fn submit_forfeit_sigs(
859	wallet: &Wallet,
860	forfeit_sigs: HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>,
861) -> anyhow::Result<()> {
862	let mut srv = wallet.require_server().context("server not available")?;
863
864	debug!("Sending {} sets of forfeit signatures for our inputs", forfeit_sigs.len());
865	srv.client.provide_forfeit_signatures(protos::ForfeitSignaturesRequest {
866		signatures: forfeit_sigs.into_iter().map(|(id, sigs)| {
867			protos::ForfeitSignatures {
868				input_vtxo_id: id.to_bytes().to_vec(),
869				pub_nonces: sigs.iter().map(|s| s.0.serialize().to_vec()).collect(),
870				signatures: sigs.iter().map(|s| s.1.serialize().to_vec()).collect(),
871			}
872		}).collect(),
873	}).await.context("failed to submit forfeit signatures")?;
874
875	Ok(())
876}
877
878//TODO(stevenroose) should be made idempotent
879async fn persist_round_success(
880	wallet: &Wallet,
881	participation: &RoundParticipation,
882	movement_id: Option<MovementId>,
883	new_vtxos: &[Vtxo],
884	signed_round_tx: &Transaction,
885) -> anyhow::Result<()> {
886	debug!("Persisting newly finished round. {} new vtxos, {} offboards, movement ID {:?}",
887		new_vtxos.len(), participation.offboards.len(), movement_id,
888	);
889
890	let store_result = wallet.store_spendable_vtxos(new_vtxos);
891	let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs);
892	let update_result = if let Some(movement_id) = movement_id {
893		wallet.movements.update_movement(movement_id, MovementUpdate::new()
894			.produced_vtxos(new_vtxos)
895			.metadata([("funding_txid".into(), serde_json::to_value(signed_round_tx.compute_txid())?)])
896		).await
897	} else {
898		Ok(())
899	};
900	match (store_result, spent_result, update_result) {
901		(Ok(()), Ok(()), Ok(())) => Ok(()),
902		(Err(e), _, _) => Err(e),
903		(_, Err(e), _) => Err(e),
904		(_, _, Err(e)) => Err(anyhow!(
905			"Failed to update movement after round success: {:#}", e
906		)),
907	}
908}
909
910//TODO(stevenroose) should be made idempotent
911async fn persist_round_failure(
912	wallet: &Wallet,
913	participation: &RoundParticipation,
914	movement_id: Option<MovementId>,
915) -> anyhow::Result<()> {
916	debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
917	let unlock_result = wallet.unlock_vtxos(&participation.inputs);
918	let finish_result = if let Some(movement_id) = movement_id {
919		wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
920	} else {
921		Ok(())
922	};
923	if let Err(e) = &finish_result {
924		error!("Failed to mark movement as failed: {:#}", e);
925	}
926	match (unlock_result, finish_result) {
927		(Ok(()), Ok(())) => Ok(()),
928		(Err(e), _) => Err(e),
929		(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
930	}
931}
932
933async fn update_funding_txid(
934	funding_txid: Txid,
935	movement_id: MovementId,
936	wallet: &Wallet,
937) -> anyhow::Result<()> {
938	wallet.movements.update_movement(
939		movement_id,
940		MovementUpdate::new()
941			.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
942	).await.context("Unable to update funding txid of round")
943}
944
945/// Track any round for which we signed forfeit txs
946///
947/// Any round for which we signed forfeit txs will be tracked in an object like this.
948/// Both when the round finished successfully or not. The funding tx in this object
949/// can thus be unsigned.
950#[derive(Debug)]
951pub struct UnconfirmedRound {
952	/// This round tx is not necessarily signed
953	pub(crate) funding_tx: Transaction,
954	pub(crate) new_vtxos: Vec<Vtxo>,
955
956	// Some information for double spend detection
957
958	/// A txid that double spends each input of the round tx
959	pub(crate) double_spenders: Vec<Option<Txid>>,
960
961	/// The time at which we first noticed we got double spent
962	///
963	/// We use this when the user is using bitcoind because in bitcoind it's
964	/// impossible to detect which txs spend a UTXO. So in order to detect
965	/// whether our tx is double spend, we will just abort the round
966	/// if we are out of the mempool for double the expected time to be
967	/// deeply double spent.
968	pub(crate) first_double_spent_at: Option<SystemTime>,
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
972pub(crate) enum UnconfirmedRoundStatus {
973	/// The round's funding tx confirmed deeply
974	Confirmed,
975	/// The round's funding tx was double spent deeply
976	DoubleSpent {
977		/// We can't always know the double spender
978		double_spender: Option<Txid>,
979	},
980	Unconfirmed,
981}
982
983impl UnconfirmedRound {
984	/// Create a new instance of the [AwaitingConfirmation] state for
985	/// a round that was synced from the server, but we have lost context for.
986	pub fn new(
987		funding_tx: Transaction,
988		new_vtxos: Vec<Vtxo>,
989	) -> Self {
990		UnconfirmedRound {
991			new_vtxos: new_vtxos,
992			double_spenders: vec![None; funding_tx.input.len()],
993			funding_tx: funding_tx,
994			first_double_spent_at: None,
995		}
996	}
997
998	pub fn funding_txid(&self) -> Txid {
999		self.funding_tx.compute_txid()
1000	}
1001
1002	/// Whether we have a signed tx
1003	fn is_tx_signed(&self) -> bool {
1004		!self.funding_tx.input.iter().any(|i| i.witness.is_empty())
1005	}
1006
1007	/// Check if our version of the round tx is signed and if not try replace it
1008	async fn maybe_update_tx(&mut self, txid: Txid, chain: &ChainSource) {
1009		if !self.is_tx_signed() {
1010			if let Ok(Some(tx)) = chain.get_tx(&txid).await {
1011				assert_eq!(txid, tx.compute_txid());
1012				debug!("Retrieved signed version of round tx {}", txid);
1013				self.funding_tx = tx;
1014			}
1015		}
1016	}
1017
1018	/// Check if the round tx was double spent and if so
1019	/// returns a UnconfirmedRoundStatus::DoubleSpent.
1020	async fn check_if_double_spent(
1021		&mut self,
1022		wallet: &Wallet,
1023	) -> anyhow::Result<Option<UnconfirmedRoundStatus>> {
1024
1025		let round_txid = self.funding_txid();
1026		match wallet.chain.inner() {
1027			ChainSourceClient::Esplora(c) => {
1028				let mut confirmed = None;
1029				for (idx, input) in self.funding_tx.input.iter().enumerate() {
1030					if let Some(txid) = self.double_spenders[idx] {
1031						match wallet.chain.tx_status(txid).await? {
1032							TxStatus::Confirmed(b) => {
1033								confirmed = cmp::max(confirmed, Some((b.height, txid)));
1034								continue;
1035							},
1036							TxStatus::Mempool => continue,
1037							TxStatus::NotFound => self.double_spenders[idx] = None,
1038						}
1039					}
1040
1041					let info = c.get_output_status(
1042						&input.previous_output.txid, input.previous_output.vout as u64,
1043					).await?;
1044					match info {
1045						None => warn!("Input {} of round tx {} not found by chain source",
1046							input.previous_output, round_txid,
1047						),
1048						Some(info) => {
1049							if !info.spent || info.txid == Some(round_txid) {
1050								continue;
1051							}
1052
1053							let txid = info.txid.context("expected txid")?;
1054							self.double_spenders[idx] = Some(txid);
1055							let status = info.status.context("expected status")?;
1056							if let Some(height) = status.block_height {
1057								// NB we rely on Ord impl that sorts tuples first by first item
1058								confirmed = cmp::max(confirmed, Some((height, txid)));
1059							}
1060						},
1061					}
1062				}
1063
1064				if let Some((height, txid)) = confirmed {
1065					let confirmations = wallet.chain.tip().await? - (height - 1);
1066					if confirmations >= wallet.config.round_tx_required_confirmations {
1067						return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1068							double_spender: Some(txid),
1069						}));
1070					}
1071					debug!("Round tx {} double spent by tx {} with {} confirmations",
1072						round_txid, txid, confirmations,
1073					);
1074				}
1075
1076				Ok(None)
1077			},
1078			ChainSourceClient::Bitcoind(b) => {
1079				// check whether our round tx is double spent
1080				let mut doublespent = false;
1081				for inp in &self.funding_tx.input {
1082					let OutPoint { txid, vout } = inp.previous_output;
1083					if b.get_tx_out(&txid, vout, Some(false))?.is_none() {
1084						doublespent = true;
1085						break;
1086					}
1087				}
1088
1089				if doublespent {
1090					let now = SystemTime::now();
1091					let since = self.first_double_spent_at.get_or_insert(now);
1092					let req_confs = wallet.config.round_tx_required_confirmations;
1093					//TODO(stevenroose) maybe also do 5 days?
1094					let req_time = 2 * req_confs * BLOCK_TIME;
1095					if let Ok(time) = now.duration_since(*since) && time > req_time {
1096						return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1097							double_spender: None,
1098						}));
1099					}
1100				} else {
1101					self.first_double_spent_at.take();
1102				}
1103
1104				Ok(None)
1105			},
1106		}
1107	}
1108
1109	// NB we must never restart again from here
1110	pub(crate) async fn sync(
1111		&mut self,
1112		wallet: &Wallet,
1113	) -> anyhow::Result<UnconfirmedRoundStatus> {
1114		let txid = self.funding_txid();
1115		match wallet.chain.tx_status(txid).await? {
1116			TxStatus::NotFound => {
1117				debug!("Round funding tx {} no longer found in mempool", txid);
1118				if let Some(res) = self.check_if_double_spent(wallet).await? {
1119					return Ok(res);
1120				}
1121				if self.is_tx_signed() {
1122					// try to broadcast
1123					let _ = wallet.chain.broadcast_tx(&self.funding_tx).await;
1124				}
1125				Ok(UnconfirmedRoundStatus::Unconfirmed)
1126			},
1127			TxStatus::Mempool => {
1128				debug!("Round funding tx {} still in mempool, waiting for confirmations", txid);
1129				self.first_double_spent_at = None;
1130				self.maybe_update_tx(txid, &wallet.chain).await;
1131				Ok(UnconfirmedRoundStatus::Unconfirmed)
1132			},
1133			TxStatus::Confirmed(block) => {
1134				self.first_double_spent_at = None;
1135				self.maybe_update_tx(txid, &wallet.chain).await;
1136				let confirmations = {
1137					let tip = wallet.chain.tip().await?;
1138					tip - block.height + 1
1139				};
1140				debug!("Round funding tx {} has {} confirmations", txid, confirmations);
1141
1142				if confirmations >= wallet.config.round_tx_required_confirmations {
1143					//TODO(stevenroose) ensure vtxos are created
1144					// we currently make the movement when the round finishes, and
1145					// we currently don't have a way to not accidentally do this twice.
1146					// Should probably have some idempotent VTXO state/movement API
1147					// so that here we can call this again in the case of:
1148					// - initial call after finished round failed
1149					// - we recovered from a round we didn't get finish message from
1150					// - we synced a round from when we were offline
1151					Ok(UnconfirmedRoundStatus::Confirmed)
1152				} else {
1153					Ok(UnconfirmedRoundStatus::Unconfirmed)
1154				}
1155			},
1156		}
1157	}
1158}
1159
1160
1161impl Wallet {
1162	/// Start a new round participation
1163	///
1164	/// This function will store the state in the db and mark the VTXOs as locked.
1165	pub async fn join_next_round(
1166		&self,
1167		participation: RoundParticipation,
1168		movement_kind: Option<RoundMovement>,
1169	) -> anyhow::Result<StoredRoundState> {
1170		// verify if our participation makes sense
1171		if let Some(payreq) = participation.outputs.iter().find(|p| p.amount < P2TR_DUST) {
1172			bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
1173		}
1174		if let Some(offb) = participation.offboards.iter().find(|o| o.amount < P2TR_DUST) {
1175			bail!("Offboard amount must be at least {}, requested {}", P2TR_DUST, offb.amount);
1176		}
1177
1178		let movement_id = if let Some(kind) = movement_kind {
1179			let movement_id = self.movements.new_movement(
1180				self.subsystem_ids[&BarkSubsystem::Round], kind.to_string(),
1181			).await?;
1182			let update = participation.to_movement_update(self.chain.network())?;
1183			self.movements.update_movement(movement_id, update).await?;
1184			Some(movement_id)
1185		} else {
1186			None
1187		};
1188		let state = RoundState::new(participation, movement_id);
1189
1190		let id = self.db.store_round_state_lock_vtxos(&state)?;
1191		Ok(StoredRoundState { id, state })
1192	}
1193
1194	/// Get all pending round states
1195	pub fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1196		self.db.load_round_states()
1197	}
1198
1199	/// Sync pending rounds that have finished but are waiting for confirmations
1200	pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1201		let states = self.db.load_round_states()?;
1202		if !states.is_empty() {
1203			debug!("Syncing {} pending round states...", states.len());
1204
1205			tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1206				// not processing events here
1207				if !state.state.round_has_finished() {
1208					return;
1209				}
1210
1211				match state.state.sync(self).await {
1212					Ok(RoundStatus::Confirmed { funding_txid }) => {
1213						info!("Round confirmed. Funding tx {}", funding_txid);
1214						if let Err(e) = self.db.remove_round_state(&state) {
1215							warn!("Error removing finished round state from db: {:#}", e);
1216						}
1217					},
1218					Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1219						info!("Waiting for confirmations for round funding tx {}", funding_txid);
1220						if let Err(e) = self.db.update_round_state(&state) {
1221							warn!("Error updating pending round state in db: {:#}", e);
1222						}
1223					},
1224					Ok(RoundStatus::Pending { unsigned_funding_txids: txs }) => {
1225						info!("Round still pending, potential funding txs: {:?}", txs);
1226						if let Err(e) = self.db.update_round_state(&state) {
1227							warn!("Error updating pending round state in db: {:#}", e);
1228						}
1229					},
1230					Ok(RoundStatus::Failed { error }) => {
1231						error!("Round failed: {}", error);
1232						if let Err(e) = self.db.remove_round_state(&state) {
1233							warn!("Error removing finished round state from db: {:#}", e);
1234						}
1235					},
1236					Err(e) => {
1237						warn!("Error syncing round: {:#}", e);
1238						return;
1239					},
1240				}
1241			}).await;
1242		}
1243
1244		// also sync recovered states if we have any
1245		let recovered = self.db.load_recovered_rounds()?;
1246		if !recovered.is_empty() {
1247			debug!("Syncing {} recovered past rounds...", recovered.len());
1248
1249			tokio_stream::iter(recovered).for_each_concurrent(10, |mut state| async move {
1250				match state.sync(self).await {
1251					Ok(UnconfirmedRoundStatus::Confirmed) => {
1252						info!("Recovered old round with funding txid {} confirmed",
1253							state.funding_txid(),
1254						);
1255						if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1256							warn!("Error removing finished recovered round from db: {:#}", e);
1257						}
1258					},
1259					Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
1260						debug!("Old recovered round {} invalidated because double spent by {:?}",
1261							state.funding_txid(), double_spender,
1262						);
1263						if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1264							warn!("Error invalidated recovered round from db: {:#}", e);
1265						}
1266					},
1267					Ok(UnconfirmedRoundStatus::Unconfirmed) => {},
1268					Err(e) => debug!("Error trying to progress recovered past round: {:#}", e),
1269				}
1270			}).await;
1271		}
1272
1273		Ok(())
1274	}
1275
1276	/// Fetch last round event from server
1277	async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1278		let mut srv = self.require_server()?;
1279		let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1280		Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1281	}
1282
1283	/// Try to make incrimental progress on all pending round states
1284	///
1285	/// If the `last_round_event` argument is not provided, it will be fetched
1286	/// from the server.
1287	pub async fn progress_ongoing_rounds(
1288		&self,
1289		last_round_event: Option<&RoundEvent>,
1290	) -> anyhow::Result<()> {
1291		let states = self.db.load_round_states()?;
1292
1293		// so we can fill an owned one in case we lazily fetch one
1294		let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1295
1296		// NB we want to try make progress on all states,
1297		// so we shouldn't error/abort early
1298		for mut state in states {
1299			if !state.state.round_has_finished() {
1300				let event = match last_round_event {
1301					Some(ref e) => e,
1302					None => match self.get_last_round_event().await {
1303						Ok(e) => {
1304							last_round_event = Some(Cow::Owned(e));
1305							last_round_event.as_ref().unwrap()
1306						},
1307						Err(e) => {
1308							warn!("Couldn't make progress on an ongoing round: {:#}", e);
1309							continue;
1310						},
1311					},
1312				};
1313
1314				let updated = state.state.process_event(self, event.as_ref()).await;
1315				if updated {
1316					self.db.update_round_state(&state)?;
1317				}
1318			}
1319
1320			let status = state.state.sync(self).await?;
1321			if status.is_final() {
1322				info!("Round finished with result: {:?}", status);
1323				if let Err(e) = self.db.remove_round_state(&state) {
1324					warn!("Failed to remove finished round from db: {:#}", e);
1325				}
1326			}
1327		}
1328
1329		Ok(())
1330	}
1331
1332	pub async fn subscribe_round_events(&self)
1333		-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1334	{
1335		let mut srv = self.require_server()?;
1336		let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1337			.into_inner().map(|m| {
1338				let m = m.context("received error on event stream")?;
1339				let e = RoundEvent::try_from(m.clone())
1340					.with_context(|| format!("error converting rpc round event: {:?}", m))?;
1341				trace!("Received round event: {}", e);
1342				Ok::<_, anyhow::Error>(e)
1343			});
1344		Ok(events)
1345	}
1346
1347	/// Participate in a round
1348	///
1349	/// This function will start a new round participation and block until
1350	/// the round is finished.
1351	/// After this method returns the round state will be kept active until
1352	/// the round tx fully confirms.
1353	pub(crate) async fn participate_round(
1354		&self,
1355		participation: RoundParticipation,
1356		movement_kind: Option<RoundMovement>,
1357	) -> anyhow::Result<RoundStatus> {
1358		let mut state = self.join_next_round(participation, movement_kind).await?;
1359
1360		info!("Waiting for a round start...");
1361		let mut events = self.subscribe_round_events().await?;
1362
1363		loop {
1364			if state.state.round_has_finished() {
1365				return Ok(state.state.sync(self).await?);
1366			}
1367
1368			let event = events.next().await
1369				.context("events stream broke")?
1370				.context("error on event stream")?;
1371			if state.state.process_event(self, &event).await {
1372				self.db.update_round_state(&state)?;
1373			}
1374		}
1375	}
1376
1377	/// Look for past rounds that might contain some of our VTXOs
1378	///
1379	/// Afterwards, call [Wallet::sync_pending_rounds] to make progress on these.
1380	pub async fn start_sync_past_rounds(&self) -> anyhow::Result<()> {
1381		let mut srv = self.require_server()?;
1382
1383		let fresh_rounds = srv.client.get_fresh_rounds(protos::FreshRoundsRequest {
1384			last_round_txid: None,
1385		}).await?.into_inner().txids.into_iter()
1386			.map(|txid| RoundId::from_slice(&txid))
1387			.collect::<Result<Vec<_>, _>>()?;
1388
1389		if fresh_rounds.is_empty() {
1390			debug!("No new rounds to sync");
1391			return Ok(());
1392		}
1393
1394		debug!("Received {} new rounds from ark", fresh_rounds.len());
1395
1396		let last_pk_index = self.db.get_last_vtxo_key_index()?.unwrap_or_default();
1397		let pubkeys = (0..=last_pk_index).map(|idx| {
1398			self.vtxo_seed.derive_keypair(idx).public_key()
1399		}).collect::<HashSet<_>>();
1400
1401		let pending_states = Arc::new(self.db.load_recovered_rounds()?.into_iter()
1402			.map(|s| (s.funding_txid(), s))
1403			.collect::<HashMap<_, _>>());
1404
1405		let results = tokio_stream::iter(fresh_rounds).map(|round_id| {
1406			let pubkeys = pubkeys.clone();
1407			let mut srv = srv.clone();
1408			let pending_states = pending_states.clone();
1409
1410			async move {
1411				//TODO(stevenroose) detect if we already are aware
1412				if pending_states.contains_key(&round_id.as_round_txid()) {
1413					debug!("Skipping round {} because it already exists", round_id);
1414					return Ok::<_, anyhow::Error>(());
1415				}
1416
1417				let req = protos::RoundId {
1418					txid: round_id.as_round_txid().to_byte_array().to_vec(),
1419				};
1420				let round = srv.client.get_round(req).await?.into_inner();
1421
1422				let tree = SignedVtxoTreeSpec::deserialize(&round.signed_vtxos)
1423					.context("invalid signed vtxo tree from srv")?
1424					.into_cached_tree();
1425
1426				let mut reqs = Vec::new();
1427				let mut vtxos = vec![];
1428				for (idx, dest) in tree.spec.spec.vtxos.iter().enumerate() {
1429					if pubkeys.contains(&dest.vtxo.policy.user_pubkey()) {
1430						let vtxo = tree.build_vtxo(idx).expect("correct leaf idx");
1431
1432						if self.db.get_wallet_vtxo(vtxo.id())?.is_none() {
1433							debug!("Built new vtxo {} with value {}", vtxo.id(), vtxo.amount());
1434							reqs.push(dest.vtxo.clone());
1435							vtxos.push(vtxo);
1436						} else {
1437							debug!("Not adding vtxo {} because it already exists", vtxo.id());
1438						}
1439					}
1440				}
1441
1442				let round_tx = deserialize::<Transaction>(&round.funding_tx)?;
1443
1444				let state = UnconfirmedRound::new(round_tx, vtxos);
1445				self.db.store_recovered_round(&state)?;
1446
1447				Ok(())
1448			}
1449		})
1450			.buffer_unordered(10)
1451			.collect::<Vec<_>>()
1452			.await;
1453
1454		for result in results {
1455			if let Err(e) = result {
1456				return Err(e).context("failed to sync round");
1457			}
1458		}
1459
1460		Ok(())
1461	}
1462}