1use std::{cmp, iter};
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::sync::Arc;
9use std::time::{Duration, SystemTime};
10use std::collections::{HashMap, HashSet};
11
12use anyhow::Context;
13use bdk_esplora::esplora_client::Amount;
14use bip39::rand;
15use bitcoin::consensus::encode::{serialize_hex, deserialize};
16use bitcoin::key::Keypair;
17use bitcoin::secp256k1::{schnorr, PublicKey};
18use bitcoin::{Address, Network, OutPoint, Transaction, Txid};
19use bitcoin::consensus::Params;
20use bitcoin::hashes::Hash;
21use futures::future::try_join_all;
22use futures::{Stream, StreamExt};
23use log::{debug, error, info, trace, warn};
24
25use ark::{OffboardRequest, ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoId, VtxoRequest};
26use ark::connectors::ConnectorChain;
27use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
28use ark::rounds::{
29 RoundAttempt, RoundEvent, RoundFinished, RoundId, RoundSeq, MIN_ROUND_TX_OUTPUTS, ROUND_TX_CONNECTOR_VOUT, ROUND_TX_VTXO_TREE_VOUT
30};
31use ark::tree::signed::{SignedVtxoTreeSpec, VtxoTreeSpec};
32use bitcoin_ext::{TxStatus, P2TR_DUST};
33use bitcoin_ext::rpc::RpcApi;
34use server_rpc::protos;
35
36use crate::{SECP, Wallet};
37use crate::movement::{MovementDestination, MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::onchain::{ChainSource, ChainSourceClient};
40use crate::persist::StoredRoundState;
41use crate::subsystem::{BarkSubsystem, RoundMovement};
42
43const BLOCK_TIME: Duration = Duration::from_secs(10 * 60);
45
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct RoundParticipation {
50 #[serde(with = "ark::encode::serde::vec")]
51 pub inputs: Vec<Vtxo>,
52 pub outputs: Vec<VtxoRequest>,
55 pub offboards: Vec<OffboardRequest>,
56}
57
58impl RoundParticipation {
59 pub fn to_movement_update(&self, network: Network) -> anyhow::Result<MovementUpdate> {
60 let params = Params::from(network);
61 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
62 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
63 let offboard_amount = self.offboards.iter().map(|r| r.amount).sum::<Amount>();
64 let fee = input_amount - output_amount - offboard_amount;
65 let intended = -offboard_amount.to_signed()?;
66 let mut sent_to = Vec::with_capacity(self.offboards.len());
67 for o in &self.offboards {
68 let address = Address::from_script(&o.script_pubkey, ¶ms)?;
69 sent_to.push(MovementDestination::new(address.to_string(), o.amount));
70 }
71 Ok(MovementUpdate::new()
72 .consumed_vtxos(&self.inputs)
73 .intended_balance(intended)
74 .effective_balance(intended - fee.to_signed()?)
75 .fee(fee)
76 .sent_to(sent_to)
77 )
78 }
79}
80
81#[derive(Debug, Clone)]
82pub enum RoundStatus {
83 Confirmed {
85 funding_txid: Txid,
86 },
87 Unconfirmed {
89 funding_txid: Txid,
90 },
91 Pending {
93 unsigned_funding_txids: Vec<Txid>,
94 },
95 Failed {
97 error: String,
98 },
99}
100
101impl RoundStatus {
102 pub fn is_final(&self) -> bool {
104 match self {
105 Self::Confirmed { .. } => true,
106 Self::Unconfirmed { .. } => false,
107 Self::Pending { .. } => false,
108 Self::Failed { .. } => true,
109 }
110 }
111
112 pub fn is_success(&self) -> bool {
114 match self {
115 Self::Confirmed { .. } => true,
116 Self::Unconfirmed { .. } => true,
117 Self::Pending { .. } => false,
118 Self::Failed { .. } => false,
119 }
120 }
121}
122
123pub struct RoundState {
133 pub(crate) participation: RoundParticipation,
135
136 pub(crate) flow: RoundFlowState,
138
139 pub(crate) unconfirmed_rounds: Vec<UnconfirmedRound>,
141
142 pub(crate) movement_id: Option<MovementId>,
144}
145
146impl RoundState {
147 fn new(participation: RoundParticipation, movement_id: Option<MovementId>) -> Self {
148 Self {
149 participation,
150 movement_id,
151 flow: RoundFlowState::WaitingToStart,
152 unconfirmed_rounds: Vec::new(),
153 }
154 }
155
156 pub fn participation(&self) -> &RoundParticipation {
158 &self.participation
159 }
160
161 pub fn flow(&self) -> &RoundFlowState {
162 &self.flow
163 }
164
165 pub fn unconfirmed_rounds(&self) -> &[UnconfirmedRound] {
166 &self.unconfirmed_rounds
167 }
168
169 pub fn round_has_finished(&self) -> bool {
171 match self.flow {
172 RoundFlowState::WaitingToStart => false,
173 RoundFlowState::Ongoing { .. } => false,
174 RoundFlowState::Success => true,
175 RoundFlowState::Failed { .. } => true,
176 }
177 }
178
179 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
180 match start_attempt(wallet, &self.participation, attempt).await {
181 Ok(state) => {
182 self.flow = RoundFlowState::Ongoing {
183 round_seq: attempt.round_seq,
184 attempt_seq: attempt.attempt_seq,
185 state: state,
186 };
187 },
188 Err(e) => {
189 self.flow = RoundFlowState::Failed {
190 error: format!("{:#}", e),
191 };
192 },
193 }
194 }
195
196 pub async fn process_event(
198 &mut self,
199 wallet: &Wallet,
200 event: &RoundEvent,
201 ) -> bool {
202 let _: Infallible = match self.flow {
203 RoundFlowState::WaitingToStart => {
204 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
205 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
206 self.try_start_attempt(wallet, e).await;
207 return true;
208 } else {
209 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
210 event.kind(), event.round_seq(), event.attempt_seq(),
211 );
212 return false;
213 }
214 },
215 RoundFlowState::Ongoing { round_seq, attempt_seq, ref mut state } => {
216 if event.round_seq() > round_seq {
219 self.flow = RoundFlowState::Failed {
222 error: format!("round {} started while we were on {}",
223 event.round_seq(), round_seq,
224 ),
225 };
226 return true;
227 }
228
229 if event.attempt_seq() < attempt_seq {
230 trace!("ignoring replayed message from old attempt");
231 return false;
232 }
233
234 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
235 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
236 self.try_start_attempt(wallet, e).await;
237 return true;
238 }
239 trace!("Processing event {} for round attempt {}:{} in state {}",
240 event.kind(), round_seq, attempt_seq, state.kind(),
241 );
242
243 let mut updated = false;
244 match progress_attempt(state, wallet, &self.participation, event).await {
245 AttemptProgressResult::NotUpdated => {},
246 AttemptProgressResult::Updated { new_state, new_unconfirmed_round } => {
247 if let Some(r) = new_unconfirmed_round {
248 self.unconfirmed_rounds.push(r);
249 }
250 *state = new_state;
251 updated = true;
252 },
253 AttemptProgressResult::Failed(e) => {
254 self.flow = RoundFlowState::Failed { error: format!("{:#}", e) };
255 updated = true;
256 },
257 AttemptProgressResult::Finished { signed_round_tx, vtxos } => {
258 assert!(!self.unconfirmed_rounds.is_empty());
259
260 let txid = signed_round_tx.compute_txid();
262 if let Some(round) = self.unconfirmed_rounds.iter_mut()
263 .find(|r| r.funding_txid() == txid)
264 {
265 round.funding_tx = signed_round_tx;
266
267 if let Err(e) = persist_round_success(
268 wallet,
269 &self.participation,
270 self.movement_id,
271 &vtxos,
272 &round.funding_tx,
273 ).await {
274 error!("Error while storing succesful round: {:#}", e);
275 }
277
278 self.flow = RoundFlowState::Success;
279 } else {
280 self.flow = RoundFlowState::Failed {
281 error: format!("server sent signed round tx {}, \
282 but we don't have a state for that", txid,),
283 };
284 };
285 updated = true;
286 },
287 }
288 return updated;
289 },
290 RoundFlowState::Success { .. } | RoundFlowState::Failed { .. } => return false,
291 };
292 }
293
294 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
298 let mut confirmed_funding_txid = None;
299 let mut idx = 0;
300 while idx < self.unconfirmed_rounds.len() {
301 let round = self.unconfirmed_rounds.get_mut(idx).unwrap();
302
303 let was_signed = round.is_tx_signed();
304 let res = round.sync(wallet).await;
305
306 if !was_signed && round.is_tx_signed() {
311 if let Err(e) = persist_round_success(
312 wallet,
313 &self.participation,
314 self.movement_id,
315 &round.new_vtxos,
316 &round.funding_tx,
317 ).await {
318 error!("Error storing state after seeing signed funding tx: {:#?}", e);
319 idx += 1;
320 continue;
321 }
322 }
323
324 let _: Infallible = match res {
329 Ok(UnconfirmedRoundStatus::Confirmed) => {
330 confirmed_funding_txid = Some(round.funding_txid());
331 idx += 1;
335 continue;
336 },
337 Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
338 debug!("Round with round txid {} got double spent by tx {:?}",
339 round.funding_tx.compute_txid(), double_spender,
340 );
341 self.unconfirmed_rounds.swap_remove(idx);
342 continue; },
344 Ok(UnconfirmedRoundStatus::Unconfirmed) => {
345 idx += 1;
346 continue;
347 },
348 Err(e) => {
349 warn!("Error syncing status of unconfirmed round: {:#}", e);
350 trace!("Error syncing status of unconfirmed round: err={:#}; state={:?}",
351 e, round,
352 );
353 idx += 1;
354 continue;
355 }
356 };
357 }
358
359 let status = if let Some(funding_txid) = confirmed_funding_txid {
360 if let Some(movement_id) = self.movement_id {
361 update_funding_txid(funding_txid, movement_id, wallet).await?;
362 wallet.movements.finish_movement(movement_id, MovementStatus::Finished).await?;
363 }
364
365 RoundStatus::Confirmed { funding_txid }
366 } else if self.unconfirmed_rounds.is_empty() {
367 match self.flow {
368 RoundFlowState::WaitingToStart | RoundFlowState::Ongoing { .. } => {
369 RoundStatus::Pending { unsigned_funding_txids: vec![] }
370 }
371 RoundFlowState::Success => {
372 persist_round_failure(wallet, &self.participation, self.movement_id)
373 .await
374 .context("failed to persist round failure")?;
375 RoundStatus::Failed {
376 error: "all pending round funding transactions have been double spent".into(),
377 }
378 },
379 RoundFlowState::Failed { ref error } => {
380 persist_round_failure(wallet, &self.participation, self.movement_id)
381 .await
382 .context("failed to persist round failure")?;
383 RoundStatus::Failed { error: error.clone() }
384 },
385 }
386 } else if let Some(signed) = self.unconfirmed_rounds.iter().find(|r| r.is_tx_signed()) {
387 let funding_txid = signed.funding_txid();
388 if let Some(movement_id) = self.movement_id {
389 update_funding_txid(funding_txid, movement_id, wallet).await?;
390 }
391
392 RoundStatus::Unconfirmed { funding_txid }
393 } else {
394 RoundStatus::Pending {
395 unsigned_funding_txids: self.unconfirmed_rounds.iter()
396 .map(|r| r.funding_txid())
397 .collect(),
398 }
399 };
400 Ok(status)
401 }
402
403 pub fn output_vtxos(&self) -> Option<&[Vtxo]> {
406 for round in self.unconfirmed_rounds.iter() {
407 if round.is_tx_signed() {
408 return Some(&round.new_vtxos);
409 }
410 }
411 None
412 }
413
414 pub fn locked_pending_inputs(&self) -> &[Vtxo] {
417 if self.unconfirmed_rounds.iter().any(|r| r.is_tx_signed()) {
418 return &[];
420 }
421
422 match self.flow {
423 RoundFlowState::WaitingToStart
424 | RoundFlowState::Ongoing { .. }
425 | RoundFlowState::Success =>
426 {
427 &self.participation.inputs
428 },
429 RoundFlowState::Failed { .. } => {
430 &[]
432 },
433 }
434 }
435}
436
437pub enum RoundFlowState {
442 WaitingToStart,
443 Ongoing {
444 round_seq: RoundSeq,
445 attempt_seq: usize,
446 state: AttemptState,
447 },
448 Success,
449 Failed {
450 error: String,
451 },
452}
453
454pub enum AttemptState {
459 AwaitingAttempt,
460 AwaitingUnsignedVtxoTree {
461 cosign_keys: Vec<Keypair>,
462 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
463 },
464 AwaitingRoundProposal {
465 unsigned_round_tx: Transaction,
466 vtxos_spec: VtxoTreeSpec,
467 },
468 AwaitingFinishedRound {
469 unsigned_round_tx: Transaction,
470 new_vtxos: Vec<Vtxo>,
471 },
472}
473
474impl AttemptState {
475 fn kind(&self) -> &'static str {
477 match self {
478 Self::AwaitingAttempt => "AwaitingAttempt",
479 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
480 Self::AwaitingRoundProposal { .. } => "AwaitingRoundProposal",
481 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
482 }
483 }
484}
485
486enum AttemptProgressResult {
488 Finished {
489 signed_round_tx: Transaction,
490 vtxos: Vec<Vtxo>,
491 },
492 Failed(anyhow::Error),
493 Updated {
499 new_state: AttemptState,
500 new_unconfirmed_round: Option<UnconfirmedRound>,
501 },
502 NotUpdated,
503}
504
505async fn start_attempt(
507 wallet: &Wallet,
508 participation: &RoundParticipation,
509 event: &RoundAttempt,
510) -> anyhow::Result<AttemptState> {
511 let mut srv = wallet.require_server().context("server not available")?;
512
513 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
515 .take(participation.outputs.len())
516 .collect::<Vec<_>>();
517 let vtxo_reqs = participation.outputs.iter().zip(cosign_keys.iter()).map(|(p, ck)| {
518 SignedVtxoRequest { vtxo: p.clone(), cosign_pubkey: Some(ck.public_key()) }
519 }).collect::<Vec<_>>();
520
521 let cosign_nonces = cosign_keys.iter()
524 .map(|key| {
525 let mut secs = Vec::with_capacity(srv.info.nb_round_nonces);
526 let mut pubs = Vec::with_capacity(srv.info.nb_round_nonces);
527 for _ in 0..srv.info.nb_round_nonces {
528 let (s, p) = musig::nonce_pair(key);
529 secs.push(s);
530 pubs.push(p);
531 }
532 (secs, pubs)
533 })
534 .take(vtxo_reqs.len())
535 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
536
537 debug!("Submitting payment request with {} inputs, {} vtxo outputs and {} offboard outputs",
539 participation.inputs.len(), vtxo_reqs.len(), participation.offboards.len(),
540 );
541
542 srv.client.submit_payment(protos::SubmitPaymentRequest {
543 input_vtxos: participation.inputs.iter().map(|vtxo| {
544 let keypair = wallet.get_vtxo_key(&vtxo)
545 .expect("owned vtxo key should be in database");
546
547 protos::InputVtxo {
548 vtxo_id: vtxo.id().to_bytes().to_vec(),
549 ownership_proof: {
550 let sig = event.challenge.sign_with(
551 vtxo.id(), &vtxo_reqs, &participation.offboards, keypair,
552 );
553 sig.serialize().to_vec()
554 },
555 }
556 }).collect(),
557 vtxo_requests: vtxo_reqs.iter().zip(cosign_nonces.iter()).map(|(r, n)| {
558 protos::SignedVtxoRequest {
559 vtxo: Some(protos::VtxoRequest {
560 amount: r.vtxo.amount.to_sat(),
561 policy: r.vtxo.policy.serialize(),
562 }),
563 cosign_pubkey: r.cosign_pubkey.expect("just set").serialize().to_vec(),
564 public_nonces: n.1.iter().map(|n| n.serialize().to_vec()).collect(),
565 }
566 }).collect(),
567 offboard_requests: participation.offboards.iter().map(|r| {
568 protos::OffboardRequest {
569 amount: r.amount.to_sat(),
570 offboard_spk: r.script_pubkey.to_bytes(),
571 }
572 }).collect(),
573 }).await.context("Ark server refused our payment submission")?;
574
575 Ok(AttemptState::AwaitingUnsignedVtxoTree {
576 cosign_keys: cosign_keys,
577 secret_nonces: cosign_nonces.into_iter()
578 .map(|(sec, _pub)| sec.into_iter().map(DangerousSecretNonce::new).collect())
579 .collect(),
580 })
581}
582
583async fn progress_attempt(
584 state: &AttemptState,
585 wallet: &Wallet,
586 part: &RoundParticipation,
587 event: &RoundEvent,
588) -> AttemptProgressResult {
589 match (state, event) {
593
594 (
595 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces },
596 RoundEvent::VtxoProposal(e),
597 ) => {
598 match sign_vtxo_tree(
599 wallet, part, &cosign_keys, &secret_nonces, &e.unsigned_round_tx, &e.vtxos_spec, &e.cosign_agg_nonces,
600 ).await {
601 Ok(()) => {
602 AttemptProgressResult::Updated {
603 new_state: AttemptState::AwaitingRoundProposal {
604 unsigned_round_tx: e.unsigned_round_tx.clone(),
605 vtxos_spec: e.vtxos_spec.clone(),
606 },
607 new_unconfirmed_round: None,
608 }
609 },
610 Err(e) => AttemptProgressResult::Failed(e),
611 }
612 },
613
614 (
615 AttemptState::AwaitingRoundProposal { unsigned_round_tx, vtxos_spec },
616 RoundEvent::RoundProposal(e),
617 ) => {
618 match sign_forfeits(
619 wallet, part, unsigned_round_tx, vtxos_spec, &e.cosign_sigs, &e.forfeit_nonces, e.connector_pubkey,
620 ).await {
621 Ok((new_vtxos, forfeit_sigs)) => {
622 let round = UnconfirmedRound::new(unsigned_round_tx.clone(), new_vtxos.clone());
623 match submit_forfeit_sigs(wallet, forfeit_sigs).await {
624 Ok(()) => AttemptProgressResult::Updated {
625 new_state: AttemptState::AwaitingFinishedRound {
626 unsigned_round_tx: unsigned_round_tx.clone(),
627 new_vtxos: new_vtxos,
628 },
629 new_unconfirmed_round: Some(round),
630 },
631 Err(e) => {
632 warn!("Error sending forfeit sigs to server: {:#}", e);
633 AttemptProgressResult::Updated {
634 new_state: AttemptState::AwaitingAttempt,
635 new_unconfirmed_round: Some(round),
636 }
637 },
638 }
639 },
640 Err(e) => AttemptProgressResult::Failed(e),
641 }
642 },
643
644 (
645 AttemptState::AwaitingFinishedRound { unsigned_round_tx, new_vtxos },
646 RoundEvent::Finished(RoundFinished { signed_round_tx, .. }),
647 ) => {
648 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
649 return AttemptProgressResult::Failed(anyhow!(
650 "signed funding tx ({}) doesn't match tx received before ({})",
651 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
652 ));
653 }
654
655 AttemptProgressResult::Finished {
656 signed_round_tx: signed_round_tx.clone(),
657 vtxos: new_vtxos.clone(),
658 }
659 },
660
661 (state, RoundEvent::Finished(RoundFinished { .. })) => {
663 AttemptProgressResult::Failed(anyhow!(
664 "unexpectedly received a finished round while we were in state {}",
665 state.kind(),
666 ))
667 },
668
669 (state, _) => {
670 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
671 AttemptProgressResult::NotUpdated
672 },
673 }
674}
675
676async fn sign_vtxo_tree(
677 wallet: &Wallet,
678 participation: &RoundParticipation,
679 cosign_keys: &[Keypair],
680 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
681 unsigned_round_tx: &Transaction,
682 vtxo_tree: &VtxoTreeSpec,
683 cosign_agg_nonces: &[musig::AggregatedNonce],
684) -> anyhow::Result<()> {
685 let srv = wallet.require_server().context("server not available")?;
686
687 if unsigned_round_tx.output.len() < MIN_ROUND_TX_OUTPUTS {
688 bail!("server sent round tx with less than 2 outputs: {}",
689 serialize_hex(&unsigned_round_tx),
690 );
691 }
692
693 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
694
695 let my_vtxos = participation.outputs.iter().zip(cosign_keys.iter())
696 .map(|(r, k)| SignedVtxoRequest {
697 vtxo: r.clone(),
698 cosign_pubkey: Some(k.public_key()),
699 })
700 .collect::<Vec<_>>();
701
702 {
704 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
705 for vtxo_req in vtxo_tree.iter_vtxos() {
706 if let Some(i) = my_vtxos.iter().position(|v| {
707 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
708 }) {
709 my_vtxos.swap_remove(i);
710 }
711 }
712 if !my_vtxos.is_empty() {
713 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
714 }
715
716 let mut my_offbs = participation.offboards.to_vec();
717 for offb in unsigned_round_tx.output.iter().skip(2) {
718 if let Some(i) = my_offbs.iter().position(|o| o.to_txout() == *offb) {
719 my_offbs.swap_remove(i);
720 }
721 }
722 if !my_offbs.is_empty() {
723 bail!("server didn't include all of our offboards, missing: {:?}", my_offbs);
724 }
725 }
726
727 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
729 let iter = my_vtxos.iter().zip(cosign_keys).zip(secret_nonces);
730 let _ = try_join_all(iter.map(|((req, key), sec)| async {
731 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of(req).expect("req included");
732 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
733 let part_sigs = unsigned_vtxos.cosign_branch(
734 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
735 ).context("failed to cosign branch: our request not part of tree")?;
736
737 info!("Sending {} partial vtxo cosign signatures for pk {}",
738 part_sigs.len(), key.public_key(),
739 );
740
741 let _ = srv.clone().client.provide_vtxo_signatures(protos::VtxoSignaturesRequest {
742 pubkey: key.public_key().serialize().to_vec(),
743 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
744 }).await.context("error sending vtxo signatures")?;
745 Result::<(), anyhow::Error>::Ok(())
746 })).await.context("error sending VTXO signatures")?;
747
748 Ok(())
749}
750
751async fn sign_forfeits(
753 wallet: &Wallet,
754 participation: &RoundParticipation,
755 unsigned_round_tx: &Transaction,
756 vtxo_tree: &VtxoTreeSpec,
757 vtxo_cosign_sigs: &[schnorr::Signature],
758 forfeit_nonces: &HashMap<VtxoId, Vec<musig::PublicNonce>>,
759 connector_pubkey: PublicKey,
760) -> anyhow::Result<(Vec<Vtxo>, HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>)> {
761 let srv = wallet.require_server().context("server not available")?;
762
763 let round_txid = unsigned_round_tx.compute_txid();
764 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
765 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
766
767 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
769 bail!("Received incorrect vtxo cosign signatures from server");
771 }
772
773 let signed_vtxos = vtxo_tree.into_signed_tree(vtxo_cosign_sigs.to_vec());
774
775 let conn_txout = unsigned_round_tx.output.get(ROUND_TX_CONNECTOR_VOUT as usize)
777 .expect("checked before");
778 let expected_conn_txout = ConnectorChain::output(forfeit_nonces.len(), connector_pubkey);
779 if *conn_txout != expected_conn_txout {
780 bail!("round tx from server has unexpected connector output: {:?} (expected {:?})",
781 conn_txout, expected_conn_txout,
782 );
783 }
784
785 let conns_utxo = OutPoint::new(round_txid, ROUND_TX_CONNECTOR_VOUT);
786
787 let connectors = ConnectorChain::new(
789 forfeit_nonces.values().next().unwrap().len(),
790 conns_utxo,
791 connector_pubkey,
792 );
793
794 let forfeit_sigs = participation.inputs.iter().map(|vtxo| {
795 let keypair = wallet.get_vtxo_key(&vtxo)?;
796
797 let sigs = connectors.connectors().enumerate().map(|(i, (conn, _))| {
798 let (sighash, _tx) = ark::forfeit::connector_forfeit_sighash_exit(
799 vtxo, conn, connector_pubkey,
800 );
801 let srv_nonce = forfeit_nonces.get(&vtxo.id())
802 .with_context(|| format!("missing srv forfeit nonce for {}", vtxo.id()))?
803 .get(i)
804 .context("srv didn't provide enough forfeit nonces")?;
805
806 let (nonce, sig) = musig::deterministic_partial_sign(
807 &keypair,
808 [srv.info.server_pubkey],
809 &[srv_nonce],
810 sighash.to_byte_array(),
811 Some(vtxo.output_taproot().tap_tweak().to_byte_array()),
812 );
813 Ok((nonce, sig))
814 }).collect::<anyhow::Result<Vec<_>>>()?;
815
816 Ok((vtxo.id(), sigs))
817 })
818 .collect::<anyhow::Result<HashMap<_, _>>>()
819 .context("error signing forfeits")?;
820
821 let signed_vtxos = signed_vtxos.into_cached_tree();
822
823 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
824 let total_nb_expected_vtxos = expected_vtxos.len();
825
826 let mut new_vtxos = vec![];
827 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
828 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
829 let vtxo = signed_vtxos.build_vtxo(idx).expect("correct leaf idx");
830
831 vtxo.validate(&unsigned_round_tx)
834 .context("constructed invalid vtxo from tree")?;
835
836 info!("New VTXO from round: {} ({}, {})",
837 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
838 );
839
840 new_vtxos.push(vtxo);
841 expected_vtxos.swap_remove(expected_idx);
842 }
843 }
844
845 if !expected_vtxos.is_empty() {
846 if expected_vtxos.len() == total_nb_expected_vtxos {
847 bail!("None of our VTXOs were present in round!");
849 } else {
850 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
851 expected_vtxos.len(), expected_vtxos,
852 );
853 }
854 }
855 Ok((new_vtxos, forfeit_sigs))
856}
857
858async fn submit_forfeit_sigs(
859 wallet: &Wallet,
860 forfeit_sigs: HashMap<VtxoId, Vec<(musig::PublicNonce, musig::PartialSignature)>>,
861) -> anyhow::Result<()> {
862 let mut srv = wallet.require_server().context("server not available")?;
863
864 debug!("Sending {} sets of forfeit signatures for our inputs", forfeit_sigs.len());
865 srv.client.provide_forfeit_signatures(protos::ForfeitSignaturesRequest {
866 signatures: forfeit_sigs.into_iter().map(|(id, sigs)| {
867 protos::ForfeitSignatures {
868 input_vtxo_id: id.to_bytes().to_vec(),
869 pub_nonces: sigs.iter().map(|s| s.0.serialize().to_vec()).collect(),
870 signatures: sigs.iter().map(|s| s.1.serialize().to_vec()).collect(),
871 }
872 }).collect(),
873 }).await.context("failed to submit forfeit signatures")?;
874
875 Ok(())
876}
877
878async fn persist_round_success(
880 wallet: &Wallet,
881 participation: &RoundParticipation,
882 movement_id: Option<MovementId>,
883 new_vtxos: &[Vtxo],
884 signed_round_tx: &Transaction,
885) -> anyhow::Result<()> {
886 debug!("Persisting newly finished round. {} new vtxos, {} offboards, movement ID {:?}",
887 new_vtxos.len(), participation.offboards.len(), movement_id,
888 );
889
890 let store_result = wallet.store_spendable_vtxos(new_vtxos);
891 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs);
892 let update_result = if let Some(movement_id) = movement_id {
893 wallet.movements.update_movement(movement_id, MovementUpdate::new()
894 .produced_vtxos(new_vtxos)
895 .metadata([("funding_txid".into(), serde_json::to_value(signed_round_tx.compute_txid())?)])
896 ).await
897 } else {
898 Ok(())
899 };
900 match (store_result, spent_result, update_result) {
901 (Ok(()), Ok(()), Ok(())) => Ok(()),
902 (Err(e), _, _) => Err(e),
903 (_, Err(e), _) => Err(e),
904 (_, _, Err(e)) => Err(anyhow!(
905 "Failed to update movement after round success: {:#}", e
906 )),
907 }
908}
909
910async fn persist_round_failure(
912 wallet: &Wallet,
913 participation: &RoundParticipation,
914 movement_id: Option<MovementId>,
915) -> anyhow::Result<()> {
916 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
917 let unlock_result = wallet.unlock_vtxos(&participation.inputs);
918 let finish_result = if let Some(movement_id) = movement_id {
919 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
920 } else {
921 Ok(())
922 };
923 if let Err(e) = &finish_result {
924 error!("Failed to mark movement as failed: {:#}", e);
925 }
926 match (unlock_result, finish_result) {
927 (Ok(()), Ok(())) => Ok(()),
928 (Err(e), _) => Err(e),
929 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
930 }
931}
932
933async fn update_funding_txid(
934 funding_txid: Txid,
935 movement_id: MovementId,
936 wallet: &Wallet,
937) -> anyhow::Result<()> {
938 wallet.movements.update_movement(
939 movement_id,
940 MovementUpdate::new()
941 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
942 ).await.context("Unable to update funding txid of round")
943}
944
945#[derive(Debug)]
951pub struct UnconfirmedRound {
952 pub(crate) funding_tx: Transaction,
954 pub(crate) new_vtxos: Vec<Vtxo>,
955
956 pub(crate) double_spenders: Vec<Option<Txid>>,
960
961 pub(crate) first_double_spent_at: Option<SystemTime>,
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
972pub(crate) enum UnconfirmedRoundStatus {
973 Confirmed,
975 DoubleSpent {
977 double_spender: Option<Txid>,
979 },
980 Unconfirmed,
981}
982
983impl UnconfirmedRound {
984 pub fn new(
987 funding_tx: Transaction,
988 new_vtxos: Vec<Vtxo>,
989 ) -> Self {
990 UnconfirmedRound {
991 new_vtxos: new_vtxos,
992 double_spenders: vec![None; funding_tx.input.len()],
993 funding_tx: funding_tx,
994 first_double_spent_at: None,
995 }
996 }
997
998 pub fn funding_txid(&self) -> Txid {
999 self.funding_tx.compute_txid()
1000 }
1001
1002 fn is_tx_signed(&self) -> bool {
1004 !self.funding_tx.input.iter().any(|i| i.witness.is_empty())
1005 }
1006
1007 async fn maybe_update_tx(&mut self, txid: Txid, chain: &ChainSource) {
1009 if !self.is_tx_signed() {
1010 if let Ok(Some(tx)) = chain.get_tx(&txid).await {
1011 assert_eq!(txid, tx.compute_txid());
1012 debug!("Retrieved signed version of round tx {}", txid);
1013 self.funding_tx = tx;
1014 }
1015 }
1016 }
1017
1018 async fn check_if_double_spent(
1021 &mut self,
1022 wallet: &Wallet,
1023 ) -> anyhow::Result<Option<UnconfirmedRoundStatus>> {
1024
1025 let round_txid = self.funding_txid();
1026 match wallet.chain.inner() {
1027 ChainSourceClient::Esplora(c) => {
1028 let mut confirmed = None;
1029 for (idx, input) in self.funding_tx.input.iter().enumerate() {
1030 if let Some(txid) = self.double_spenders[idx] {
1031 match wallet.chain.tx_status(txid).await? {
1032 TxStatus::Confirmed(b) => {
1033 confirmed = cmp::max(confirmed, Some((b.height, txid)));
1034 continue;
1035 },
1036 TxStatus::Mempool => continue,
1037 TxStatus::NotFound => self.double_spenders[idx] = None,
1038 }
1039 }
1040
1041 let info = c.get_output_status(
1042 &input.previous_output.txid, input.previous_output.vout as u64,
1043 ).await?;
1044 match info {
1045 None => warn!("Input {} of round tx {} not found by chain source",
1046 input.previous_output, round_txid,
1047 ),
1048 Some(info) => {
1049 if !info.spent || info.txid == Some(round_txid) {
1050 continue;
1051 }
1052
1053 let txid = info.txid.context("expected txid")?;
1054 self.double_spenders[idx] = Some(txid);
1055 let status = info.status.context("expected status")?;
1056 if let Some(height) = status.block_height {
1057 confirmed = cmp::max(confirmed, Some((height, txid)));
1059 }
1060 },
1061 }
1062 }
1063
1064 if let Some((height, txid)) = confirmed {
1065 let confirmations = wallet.chain.tip().await? - (height - 1);
1066 if confirmations >= wallet.config.round_tx_required_confirmations {
1067 return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1068 double_spender: Some(txid),
1069 }));
1070 }
1071 debug!("Round tx {} double spent by tx {} with {} confirmations",
1072 round_txid, txid, confirmations,
1073 );
1074 }
1075
1076 Ok(None)
1077 },
1078 ChainSourceClient::Bitcoind(b) => {
1079 let mut doublespent = false;
1081 for inp in &self.funding_tx.input {
1082 let OutPoint { txid, vout } = inp.previous_output;
1083 if b.get_tx_out(&txid, vout, Some(false))?.is_none() {
1084 doublespent = true;
1085 break;
1086 }
1087 }
1088
1089 if doublespent {
1090 let now = SystemTime::now();
1091 let since = self.first_double_spent_at.get_or_insert(now);
1092 let req_confs = wallet.config.round_tx_required_confirmations;
1093 let req_time = 2 * req_confs * BLOCK_TIME;
1095 if let Ok(time) = now.duration_since(*since) && time > req_time {
1096 return Ok(Some(UnconfirmedRoundStatus::DoubleSpent {
1097 double_spender: None,
1098 }));
1099 }
1100 } else {
1101 self.first_double_spent_at.take();
1102 }
1103
1104 Ok(None)
1105 },
1106 }
1107 }
1108
1109 pub(crate) async fn sync(
1111 &mut self,
1112 wallet: &Wallet,
1113 ) -> anyhow::Result<UnconfirmedRoundStatus> {
1114 let txid = self.funding_txid();
1115 match wallet.chain.tx_status(txid).await? {
1116 TxStatus::NotFound => {
1117 debug!("Round funding tx {} no longer found in mempool", txid);
1118 if let Some(res) = self.check_if_double_spent(wallet).await? {
1119 return Ok(res);
1120 }
1121 if self.is_tx_signed() {
1122 let _ = wallet.chain.broadcast_tx(&self.funding_tx).await;
1124 }
1125 Ok(UnconfirmedRoundStatus::Unconfirmed)
1126 },
1127 TxStatus::Mempool => {
1128 debug!("Round funding tx {} still in mempool, waiting for confirmations", txid);
1129 self.first_double_spent_at = None;
1130 self.maybe_update_tx(txid, &wallet.chain).await;
1131 Ok(UnconfirmedRoundStatus::Unconfirmed)
1132 },
1133 TxStatus::Confirmed(block) => {
1134 self.first_double_spent_at = None;
1135 self.maybe_update_tx(txid, &wallet.chain).await;
1136 let confirmations = {
1137 let tip = wallet.chain.tip().await?;
1138 tip - block.height + 1
1139 };
1140 debug!("Round funding tx {} has {} confirmations", txid, confirmations);
1141
1142 if confirmations >= wallet.config.round_tx_required_confirmations {
1143 Ok(UnconfirmedRoundStatus::Confirmed)
1152 } else {
1153 Ok(UnconfirmedRoundStatus::Unconfirmed)
1154 }
1155 },
1156 }
1157 }
1158}
1159
1160
1161impl Wallet {
1162 pub async fn join_next_round(
1166 &self,
1167 participation: RoundParticipation,
1168 movement_kind: Option<RoundMovement>,
1169 ) -> anyhow::Result<StoredRoundState> {
1170 if let Some(payreq) = participation.outputs.iter().find(|p| p.amount < P2TR_DUST) {
1172 bail!("VTXO amount must be at least {}, requested {}", P2TR_DUST, payreq.amount);
1173 }
1174 if let Some(offb) = participation.offboards.iter().find(|o| o.amount < P2TR_DUST) {
1175 bail!("Offboard amount must be at least {}, requested {}", P2TR_DUST, offb.amount);
1176 }
1177
1178 let movement_id = if let Some(kind) = movement_kind {
1179 let movement_id = self.movements.new_movement(
1180 self.subsystem_ids[&BarkSubsystem::Round], kind.to_string(),
1181 ).await?;
1182 let update = participation.to_movement_update(self.chain.network())?;
1183 self.movements.update_movement(movement_id, update).await?;
1184 Some(movement_id)
1185 } else {
1186 None
1187 };
1188 let state = RoundState::new(participation, movement_id);
1189
1190 let id = self.db.store_round_state_lock_vtxos(&state)?;
1191 Ok(StoredRoundState { id, state })
1192 }
1193
1194 pub fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState>> {
1196 self.db.load_round_states()
1197 }
1198
1199 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1201 let states = self.db.load_round_states()?;
1202 if !states.is_empty() {
1203 debug!("Syncing {} pending round states...", states.len());
1204
1205 tokio_stream::iter(states).for_each_concurrent(10, |mut state| async move {
1206 if !state.state.round_has_finished() {
1208 return;
1209 }
1210
1211 match state.state.sync(self).await {
1212 Ok(RoundStatus::Confirmed { funding_txid }) => {
1213 info!("Round confirmed. Funding tx {}", funding_txid);
1214 if let Err(e) = self.db.remove_round_state(&state) {
1215 warn!("Error removing finished round state from db: {:#}", e);
1216 }
1217 },
1218 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1219 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1220 if let Err(e) = self.db.update_round_state(&state) {
1221 warn!("Error updating pending round state in db: {:#}", e);
1222 }
1223 },
1224 Ok(RoundStatus::Pending { unsigned_funding_txids: txs }) => {
1225 info!("Round still pending, potential funding txs: {:?}", txs);
1226 if let Err(e) = self.db.update_round_state(&state) {
1227 warn!("Error updating pending round state in db: {:#}", e);
1228 }
1229 },
1230 Ok(RoundStatus::Failed { error }) => {
1231 error!("Round failed: {}", error);
1232 if let Err(e) = self.db.remove_round_state(&state) {
1233 warn!("Error removing finished round state from db: {:#}", e);
1234 }
1235 },
1236 Err(e) => {
1237 warn!("Error syncing round: {:#}", e);
1238 return;
1239 },
1240 }
1241 }).await;
1242 }
1243
1244 let recovered = self.db.load_recovered_rounds()?;
1246 if !recovered.is_empty() {
1247 debug!("Syncing {} recovered past rounds...", recovered.len());
1248
1249 tokio_stream::iter(recovered).for_each_concurrent(10, |mut state| async move {
1250 match state.sync(self).await {
1251 Ok(UnconfirmedRoundStatus::Confirmed) => {
1252 info!("Recovered old round with funding txid {} confirmed",
1253 state.funding_txid(),
1254 );
1255 if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1256 warn!("Error removing finished recovered round from db: {:#}", e);
1257 }
1258 },
1259 Ok(UnconfirmedRoundStatus::DoubleSpent { double_spender }) => {
1260 debug!("Old recovered round {} invalidated because double spent by {:?}",
1261 state.funding_txid(), double_spender,
1262 );
1263 if let Err(e) = self.db.remove_recovered_round(state.funding_txid()) {
1264 warn!("Error invalidated recovered round from db: {:#}", e);
1265 }
1266 },
1267 Ok(UnconfirmedRoundStatus::Unconfirmed) => {},
1268 Err(e) => debug!("Error trying to progress recovered past round: {:#}", e),
1269 }
1270 }).await;
1271 }
1272
1273 Ok(())
1274 }
1275
1276 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1278 let mut srv = self.require_server()?;
1279 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1280 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1281 }
1282
1283 pub async fn progress_ongoing_rounds(
1288 &self,
1289 last_round_event: Option<&RoundEvent>,
1290 ) -> anyhow::Result<()> {
1291 let states = self.db.load_round_states()?;
1292
1293 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1295
1296 for mut state in states {
1299 if !state.state.round_has_finished() {
1300 let event = match last_round_event {
1301 Some(ref e) => e,
1302 None => match self.get_last_round_event().await {
1303 Ok(e) => {
1304 last_round_event = Some(Cow::Owned(e));
1305 last_round_event.as_ref().unwrap()
1306 },
1307 Err(e) => {
1308 warn!("Couldn't make progress on an ongoing round: {:#}", e);
1309 continue;
1310 },
1311 },
1312 };
1313
1314 let updated = state.state.process_event(self, event.as_ref()).await;
1315 if updated {
1316 self.db.update_round_state(&state)?;
1317 }
1318 }
1319
1320 let status = state.state.sync(self).await?;
1321 if status.is_final() {
1322 info!("Round finished with result: {:?}", status);
1323 if let Err(e) = self.db.remove_round_state(&state) {
1324 warn!("Failed to remove finished round from db: {:#}", e);
1325 }
1326 }
1327 }
1328
1329 Ok(())
1330 }
1331
1332 pub async fn subscribe_round_events(&self)
1333 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1334 {
1335 let mut srv = self.require_server()?;
1336 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1337 .into_inner().map(|m| {
1338 let m = m.context("received error on event stream")?;
1339 let e = RoundEvent::try_from(m.clone())
1340 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1341 trace!("Received round event: {}", e);
1342 Ok::<_, anyhow::Error>(e)
1343 });
1344 Ok(events)
1345 }
1346
1347 pub(crate) async fn participate_round(
1354 &self,
1355 participation: RoundParticipation,
1356 movement_kind: Option<RoundMovement>,
1357 ) -> anyhow::Result<RoundStatus> {
1358 let mut state = self.join_next_round(participation, movement_kind).await?;
1359
1360 info!("Waiting for a round start...");
1361 let mut events = self.subscribe_round_events().await?;
1362
1363 loop {
1364 if state.state.round_has_finished() {
1365 return Ok(state.state.sync(self).await?);
1366 }
1367
1368 let event = events.next().await
1369 .context("events stream broke")?
1370 .context("error on event stream")?;
1371 if state.state.process_event(self, &event).await {
1372 self.db.update_round_state(&state)?;
1373 }
1374 }
1375 }
1376
1377 pub async fn start_sync_past_rounds(&self) -> anyhow::Result<()> {
1381 let mut srv = self.require_server()?;
1382
1383 let fresh_rounds = srv.client.get_fresh_rounds(protos::FreshRoundsRequest {
1384 last_round_txid: None,
1385 }).await?.into_inner().txids.into_iter()
1386 .map(|txid| RoundId::from_slice(&txid))
1387 .collect::<Result<Vec<_>, _>>()?;
1388
1389 if fresh_rounds.is_empty() {
1390 debug!("No new rounds to sync");
1391 return Ok(());
1392 }
1393
1394 debug!("Received {} new rounds from ark", fresh_rounds.len());
1395
1396 let last_pk_index = self.db.get_last_vtxo_key_index()?.unwrap_or_default();
1397 let pubkeys = (0..=last_pk_index).map(|idx| {
1398 self.vtxo_seed.derive_keypair(idx).public_key()
1399 }).collect::<HashSet<_>>();
1400
1401 let pending_states = Arc::new(self.db.load_recovered_rounds()?.into_iter()
1402 .map(|s| (s.funding_txid(), s))
1403 .collect::<HashMap<_, _>>());
1404
1405 let results = tokio_stream::iter(fresh_rounds).map(|round_id| {
1406 let pubkeys = pubkeys.clone();
1407 let mut srv = srv.clone();
1408 let pending_states = pending_states.clone();
1409
1410 async move {
1411 if pending_states.contains_key(&round_id.as_round_txid()) {
1413 debug!("Skipping round {} because it already exists", round_id);
1414 return Ok::<_, anyhow::Error>(());
1415 }
1416
1417 let req = protos::RoundId {
1418 txid: round_id.as_round_txid().to_byte_array().to_vec(),
1419 };
1420 let round = srv.client.get_round(req).await?.into_inner();
1421
1422 let tree = SignedVtxoTreeSpec::deserialize(&round.signed_vtxos)
1423 .context("invalid signed vtxo tree from srv")?
1424 .into_cached_tree();
1425
1426 let mut reqs = Vec::new();
1427 let mut vtxos = vec![];
1428 for (idx, dest) in tree.spec.spec.vtxos.iter().enumerate() {
1429 if pubkeys.contains(&dest.vtxo.policy.user_pubkey()) {
1430 let vtxo = tree.build_vtxo(idx).expect("correct leaf idx");
1431
1432 if self.db.get_wallet_vtxo(vtxo.id())?.is_none() {
1433 debug!("Built new vtxo {} with value {}", vtxo.id(), vtxo.amount());
1434 reqs.push(dest.vtxo.clone());
1435 vtxos.push(vtxo);
1436 } else {
1437 debug!("Not adding vtxo {} because it already exists", vtxo.id());
1438 }
1439 }
1440 }
1441
1442 let round_tx = deserialize::<Transaction>(&round.funding_tx)?;
1443
1444 let state = UnconfirmedRound::new(round_tx, vtxos);
1445 self.db.store_recovered_round(&state)?;
1446
1447 Ok(())
1448 }
1449 })
1450 .buffer_unordered(10)
1451 .collect::<Vec<_>>()
1452 .await;
1453
1454 for result in results {
1455 if let Err(e) = result {
1456 return Err(e).context("failed to sync round");
1457 }
1458 }
1459
1460 Ok(())
1461 }
1462}