1use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::{join_all, try_join_all};
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::{SECP, Wallet, WalletVtxo};
37use crate::movement::{MovementId, MovementStatus};
38use crate::movement::update::MovementUpdate;
39use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
40
41const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
44use crate::subsystem::{RoundMovement, Subsystem};
45
46
47const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
52pub struct RoundParticipation {
53 #[serde(with = "ark::encode::serde::vec")]
54 pub inputs: Vec<Vtxo<Full>>,
55 pub outputs: Vec<VtxoRequest>,
58 #[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
60 pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
61}
62
63impl RoundParticipation {
64 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
65 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
66 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
67 let fee = input_amount - output_amount;
68 Ok(MovementUpdate::new()
69 .consumed_vtxos(&self.inputs)
70 .intended_balance(SignedAmount::ZERO)
71 .effective_balance( - fee.to_signed()?)
72 .fee(fee)
73 )
74 }
75}
76
77#[derive(Debug, Clone)]
78pub enum RoundStatus {
79 Confirmed {
81 funding_txid: Txid,
82 },
83 Unconfirmed {
85 funding_txid: Txid,
86 },
87 Pending,
89 Failed {
91 error: String,
92 },
93 Canceled,
95}
96
97impl RoundStatus {
98 pub fn is_final(&self) -> bool {
100 match self {
101 Self::Confirmed { .. } => true,
102 Self::Unconfirmed { .. } => false,
103 Self::Pending => false,
104 Self::Failed { .. } => true,
105 Self::Canceled => true,
106 }
107 }
108
109 pub fn is_success(&self) -> bool {
111 match self {
112 Self::Confirmed { .. } => true,
113 Self::Unconfirmed { .. } => true,
114 Self::Pending => false,
115 Self::Failed { .. } => false,
116 Self::Canceled => false,
117 }
118 }
119}
120
121pub struct RoundState {
134 pub(crate) done: bool,
136
137 pub(crate) participation: RoundParticipation,
139
140 pub(crate) flow: RoundFlowState,
142
143 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
148
149 pub(crate) sent_forfeit_sigs: bool,
156
157 pub(crate) movement_id: Option<MovementId>,
159}
160
161impl RoundState {
162 fn new_interactive(
163 participation: RoundParticipation,
164 movement_id: Option<MovementId>,
165 ) -> Self {
166 Self {
167 participation,
168 movement_id,
169 flow: RoundFlowState::InteractivePending,
170 new_vtxos: Vec::new(),
171 sent_forfeit_sigs: false,
172 done: false,
173 }
174 }
175
176 fn new_non_interactive(
177 participation: RoundParticipation,
178 unlock_hash: UnlockHash,
179 movement_id: Option<MovementId>,
180 ) -> Self {
181 Self {
182 participation,
183 movement_id,
184 flow: RoundFlowState::NonInteractivePending { unlock_hash },
185 new_vtxos: Vec::new(),
186 sent_forfeit_sigs: false,
187 done: false,
188 }
189 }
190
191 pub fn participation(&self) -> &RoundParticipation {
193 &self.participation
194 }
195
196 pub fn unlock_hash(&self) -> Option<UnlockHash> {
198 match self.flow {
199 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
200 RoundFlowState::InteractivePending => None,
201 RoundFlowState::InteractiveOngoing { .. } => None,
202 RoundFlowState::Failed { .. } => None,
203 RoundFlowState::Canceled => None,
204 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
205 }
206 }
207
208 pub fn funding_tx(&self) -> Option<&Transaction> {
209 match self.flow {
210 RoundFlowState::NonInteractivePending { .. } => None,
211 RoundFlowState::InteractivePending => None,
212 RoundFlowState::InteractiveOngoing { .. } => None,
213 RoundFlowState::Failed { .. } => None,
214 RoundFlowState::Canceled => None,
215 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
216 }
217 }
218
219 pub fn ongoing_participation(&self) -> bool {
221 match self.flow {
222 RoundFlowState::NonInteractivePending { .. } => false,
223 RoundFlowState::InteractivePending => true,
224 RoundFlowState::InteractiveOngoing { .. } => true,
225 RoundFlowState::Failed { .. } => false,
226 RoundFlowState::Canceled => false,
227 RoundFlowState::Finished { .. } => false,
228 }
229 }
230
231 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
234 let ret = match self.flow {
235 RoundFlowState::NonInteractivePending { .. } => {
236 bail!("it is currently not yet possible to cancel pending delegated rounds");
238 },
239 RoundFlowState::Canceled => true,
240 RoundFlowState::Failed { .. } => true,
241 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
242 self.flow = RoundFlowState::Canceled;
243 true
244 },
245 RoundFlowState::Finished { .. } => false,
246 };
247 if ret {
248 persist_round_failure(wallet, &self.participation, self.movement_id).await
249 .context("failed to persist round failure for cancelation")?;
250 }
251 Ok(ret)
252 }
253
254 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
255 match start_attempt(wallet, &self.participation, attempt).await {
256 Ok(state) => {
257 self.flow = RoundFlowState::InteractiveOngoing {
258 round_seq: attempt.round_seq,
259 attempt_seq: attempt.attempt_seq,
260 state: state,
261 };
262 },
263 Err(e) => {
264 self.flow = RoundFlowState::Failed {
265 error: format!("{:#}", e),
266 };
267 },
268 }
269 }
270
271 pub async fn process_event(
273 &mut self,
274 wallet: &Wallet,
275 event: &RoundEvent,
276 ) -> bool {
277 let _: Infallible = match self.flow {
278 RoundFlowState::InteractivePending => {
279 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
280 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
281 self.try_start_attempt(wallet, e).await;
282 return true;
283 } else {
284 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
285 event.kind(), event.round_seq(), event.attempt_seq(),
286 );
287 return false;
288 }
289 },
290 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
291 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
294 warn!("Round {} failed by server", round_seq);
295 self.flow = RoundFlowState::Failed {
296 error: format!("round {} failed by server", round_seq),
297 };
298 return true;
299 }
300
301 if event.round_seq() > round_seq {
302 self.flow = RoundFlowState::Failed {
305 error: format!("round {} started while we were on {}",
306 event.round_seq(), round_seq,
307 ),
308 };
309 return true;
310 }
311
312 if event.attempt_seq() < attempt_seq {
313 trace!("ignoring replayed message from old attempt");
314 return false;
315 }
316
317 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
318 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
319 self.try_start_attempt(wallet, e).await;
320 return true;
321 }
322 trace!("Processing event {} for round attempt {}:{} in state {}",
323 event.kind(), round_seq, attempt_seq, state.kind(),
324 );
325
326 return match progress_attempt(state, wallet, &self.participation, event).await {
327 AttemptProgressResult::NotUpdated => false,
328 AttemptProgressResult::Updated { new_state } => {
329 *state = new_state;
330 true
331 },
332 AttemptProgressResult::Failed(e) => {
333 warn!("Round failed with error: {:#}", e);
334 self.flow = RoundFlowState::Failed {
335 error: format!("{:#}", e),
336 };
337 true
338 },
339 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
340 self.new_vtxos = vtxos;
341 let funding_txid = funding_tx.compute_txid();
342 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
343 if let Some(mid) = self.movement_id {
344 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
345 warn!("Error updating the round funding txid: {:#}", e);
346 }
347 }
348 true
349 },
350 };
351 },
352 RoundFlowState::NonInteractivePending { .. }
353 | RoundFlowState::Finished { .. }
354 | RoundFlowState::Failed { .. }
355 | RoundFlowState::Canceled => return false,
356 };
357 }
358
359 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
364 match self.flow {
365 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
366 Ok(RoundStatus::Confirmed {
367 funding_txid: funding_tx.compute_txid(),
368 })
369 },
370
371 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
372 Ok(RoundStatus::Pending)
373 },
374 RoundFlowState::Failed { ref error } => {
375 persist_round_failure(wallet, &self.participation, self.movement_id).await
376 .context("failed to persist round failure")?;
377 Ok(RoundStatus::Failed { error: error.clone() })
378 },
379 RoundFlowState::Canceled => {
380 persist_round_failure(wallet, &self.participation, self.movement_id).await
381 .context("failed to persist round failure")?;
382 Ok(RoundStatus::Canceled)
383 },
384
385 RoundFlowState::NonInteractivePending { unlock_hash } => {
386 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
387 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
388 Ok(HarkProgressResult::RoundNotFound) => {
389 self.handle_round_not_found(wallet).await
390 },
391 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
392 let funding_txid = funding_tx.compute_txid();
393 self.new_vtxos = new_vtxos;
394 self.flow = RoundFlowState::Finished {
395 funding_tx: funding_tx.clone(),
396 unlock_hash: unlock_hash,
397 };
398
399 persist_round_success(
400 wallet,
401 &self.participation,
402 self.movement_id,
403 &self.new_vtxos,
404 &funding_tx,
405 ).await.context("failed to store successful round in DB!")?;
406
407 self.done = true;
408
409 Ok(RoundStatus::Confirmed { funding_txid })
410 },
411 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
412 if let Some(mid) = self.movement_id {
413 update_funding_txid(wallet, mid, funding_txid).await
414 .context("failed to update funding txid in DB")?;
415 }
416 Ok(RoundStatus::Unconfirmed { funding_txid })
417 },
418
419 Err(HarkForfeitError::Err(e)) => {
422 Err(e.context("error progressing non-interactive round"))
426 },
427 Err(HarkForfeitError::SentForfeits(e)) => {
428 self.sent_forfeit_sigs = true;
429 Err(e.context("error progressing non-interactive round \
430 after sending forfeit tx signatures"))
431 },
432 }
433 },
434 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
436 let funding_txid = funding_tx.compute_txid();
437 let confirmed = check_funding_tx_confirmations(
438 wallet, funding_txid, &funding_tx,
439 ).await.context("error checking funding tx confirmations")?;
440 if !confirmed {
441 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
442 return Ok(RoundStatus::Unconfirmed { funding_txid });
443 }
444
445 match hark_vtxo_swap(
446 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
447 ).await {
448 Ok(()) => {
449 persist_round_success(
450 wallet,
451 &self.participation,
452 self.movement_id,
453 &self.new_vtxos,
454 &funding_tx,
455 ).await.context("failed to store successful round in DB!")?;
456
457 self.done = true;
458
459 Ok(RoundStatus::Confirmed { funding_txid })
460 },
461 Err(HarkForfeitError::Err(e)) => {
462 Err(e.context("error forfeiting VTXOs after round"))
463 },
464 Err(HarkForfeitError::SentForfeits(e)) => {
465 self.sent_forfeit_sigs = true;
466 Err(e.context("error after having signed and sent \
467 forfeit signatures to server"))
468 },
469 }
470 },
471 }
472 }
473
474 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
477 if self.new_vtxos.is_empty() {
478 None
479 } else {
480 Some(&self.new_vtxos)
481 }
482 }
483
484 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
487 match self.flow {
489 RoundFlowState::NonInteractivePending { .. }
490 | RoundFlowState::InteractivePending
491 | RoundFlowState::InteractiveOngoing { .. }
492 => {
493 &self.participation.inputs
494 },
495 RoundFlowState::Finished { .. } => if self.done {
496 &[]
498 } else {
499 &self.participation.inputs
500 },
501 RoundFlowState::Failed { .. }
502 | RoundFlowState::Canceled
503 => {
504 &[]
506 },
507 }
508 }
509
510 pub fn pending_balance(&self) -> Amount {
514 if self.done {
515 return Amount::ZERO;
516 }
517
518 match self.flow {
519 RoundFlowState::NonInteractivePending { .. }
520 | RoundFlowState::InteractivePending
521 | RoundFlowState::InteractiveOngoing { .. }
522 | RoundFlowState::Finished { .. }
523 => {
524 self.participation.outputs.iter().map(|o| o.amount).sum()
525 },
526 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
527 Amount::ZERO
528 },
529 }
530 }
531
532 async fn handle_round_not_found(
539 &mut self,
540 wallet: &Wallet,
541 ) -> anyhow::Result<RoundStatus> {
542 info!("Server reports round participation not found (no forfeits sent)");
543 self.flow = RoundFlowState::Failed {
544 error: "server reports round participation not found".into(),
545 };
546 persist_round_failure(wallet, &self.participation, self.movement_id).await
547 .context("failed to persist round failure")?;
548
549 Ok(RoundStatus::Failed {
550 error: "server reports round participation not found".into(),
551 })
552 }
553}
554
555pub enum RoundFlowState {
560 NonInteractivePending {
562 unlock_hash: UnlockHash,
563 },
564
565 InteractivePending,
567 InteractiveOngoing {
569 round_seq: RoundSeq,
570 attempt_seq: usize,
571 state: AttemptState,
572 },
573
574 Finished {
576 funding_tx: Transaction,
577 unlock_hash: UnlockHash,
578 },
579
580 Failed {
582 error: String,
583 },
584
585 Canceled,
587}
588
589pub enum AttemptState {
594 AwaitingAttempt,
595 AwaitingUnsignedVtxoTree {
596 cosign_keys: Vec<Keypair>,
597 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
598 unlock_hash: UnlockHash,
599 },
600 AwaitingFinishedRound {
601 unsigned_round_tx: Transaction,
602 vtxos_spec: VtxoTreeSpec,
603 unlock_hash: UnlockHash,
604 },
605}
606
607impl AttemptState {
608 fn kind(&self) -> &'static str {
610 match self {
611 Self::AwaitingAttempt => "AwaitingAttempt",
612 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
613 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
614 }
615 }
616}
617
618enum AttemptProgressResult {
620 Finished {
621 funding_tx: Transaction,
622 vtxos: Vec<Vtxo<Full>>,
623 unlock_hash: UnlockHash,
624 },
625 Failed(anyhow::Error),
626 Updated {
632 new_state: AttemptState,
633 },
634 NotUpdated,
635}
636
637async fn start_attempt(
639 wallet: &Wallet,
640 participation: &RoundParticipation,
641 event: &RoundAttempt,
642) -> anyhow::Result<AttemptState> {
643 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
644
645 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
647 .take(participation.outputs.len())
648 .collect::<Vec<_>>();
649
650 let cosign_nonces = cosign_keys.iter()
653 .map(|key| {
654 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
655 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
656 for _ in 0..ark_info.nb_round_nonces {
657 let (s, p) = musig::nonce_pair(key);
658 secs.push(s);
659 pubs.push(p);
660 }
661 (secs, pubs)
662 })
663 .take(participation.outputs.len())
664 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
665
666
667 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
669 participation.inputs.len(), participation.outputs.len(),
670 );
671
672 let unblinded_mailbox_id = wallet.mailbox_identifier();
674 let signed_reqs = participation.outputs.iter()
675 .zip(cosign_keys.iter())
676 .zip(cosign_nonces.iter())
677 .map(|((req, cosign_key), (_sec, pub_nonces))| {
678 SignedVtxoRequest {
679 vtxo: req.clone(),
680 cosign_pubkey: cosign_key.public_key(),
681 nonces: pub_nonces.clone(),
682 }
683 })
684 .collect::<Vec<_>>();
685
686 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
687 for vtxo in participation.inputs.iter() {
688 let keypair = wallet.get_vtxo_key(vtxo).await
689 .map_err(HarkForfeitError::Err)?;
690 input_vtxos.push(protos::InputVtxo {
691 vtxo_id: vtxo.id().to_bytes().to_vec(),
692 attestation: {
693 let attestation = RoundAttemptAttestation::new(
694 event.challenge, vtxo.id(), &signed_reqs, &keypair,
695 );
696 attestation.serialize()
697 },
698 });
699 }
700
701 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
703 .map_err(HarkForfeitError::Err)?;
704
705 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
706 input_vtxos: input_vtxos,
707 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
708 #[allow(deprecated)]
709 offboard_requests: vec![],
710 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
711 }).await.context("Ark server refused our payment submission")?;
712
713 Ok(AttemptState::AwaitingUnsignedVtxoTree {
714 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
715 cosign_keys: cosign_keys,
716 secret_nonces: cosign_nonces.into_iter()
717 .map(|(sec, _pub)| sec.into_iter()
718 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
719 .collect())
720 .collect(),
721 })
722}
723
724#[derive(Debug, thiserror::Error)]
726enum HarkForfeitError {
727 #[error("error after forfeits were sent")]
729 SentForfeits(#[source] anyhow::Error),
730 #[error("error before forfeits were sent")]
732 Err(#[source] anyhow::Error),
733}
734
735async fn hark_cosign_leaf(
736 wallet: &Wallet,
737 srv: &mut ServerConnection,
738 funding_tx: &Transaction,
739 vtxo: &mut Vtxo<Full>,
740) -> anyhow::Result<()> {
741 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
742 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
743 .with_context(|| format!(
744 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
745 ))?.1;
746 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
747 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
748 protos::LeafVtxoCosignRequest::from(cosign_req),
749 ).await
750 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
751 .into_inner().try_into()
752 .context("bad leaf vtxo cosign response")?;
753 ensure!(ctx.finalize(vtxo, cosign_resp),
754 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
755 );
756
757 Ok(())
758}
759
760async fn hark_vtxo_swap(
770 wallet: &Wallet,
771 participation: &RoundParticipation,
772 output_vtxos: &mut [Vtxo<Full>],
773 funding_tx: &Transaction,
774 unlock_hash: UnlockHash,
775) -> Result<(), HarkForfeitError> {
776 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
777
778 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
780 .context("couldn't send our input vtxo transactions to server")
781 .map_err(HarkForfeitError::Err)?;
782
783 for vtxo in output_vtxos.iter_mut() {
785 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
786 .map_err(HarkForfeitError::Err)?;
787 }
788
789 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
792 unlock_hash: unlock_hash.to_byte_array().to_vec(),
793 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
794 }).await
795 .context("request forfeits nonces call failed")
796 .map_err(HarkForfeitError::Err)?
797 .into_inner().public_nonces.into_iter()
798 .map(|b| musig::PublicNonce::from_bytes(b))
799 .collect::<Result<Vec<_>, _>>()
800 .context("invalid forfeit nonces")
801 .map_err(HarkForfeitError::Err)?;
802
803 if server_nonces.len() != participation.inputs.len() {
804 return Err(HarkForfeitError::Err(anyhow!(
805 "server sent {} nonce pairs, expected {}",
806 server_nonces.len(), participation.inputs.len(),
807 )));
808 }
809
810 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
811 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
812 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
813 .ok().flatten().with_context(|| format!(
814 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
815 )).map_err(HarkForfeitError::Err)?.1;
816 forfeit_bundles.push(HashLockedForfeitBundle::new(
817 input, unlock_hash, &user_key, &nonces,
818 ))
819 }
820
821 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
822 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
823 }).await
824 .context("forfeit vtxos call failed")
825 .map_err(HarkForfeitError::SentForfeits)?
826 .into_inner().unlock_preimage.as_slice().try_into()
827 .context("invalid preimage length")
828 .map_err(HarkForfeitError::SentForfeits)?;
829
830 for vtxo in output_vtxos.iter_mut() {
831 if !vtxo.provide_unlock_preimage(preimage) {
832 return Err(HarkForfeitError::SentForfeits(anyhow!(
833 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
834 preimage.as_hex(), vtxo.id(), unlock_hash,
835 )));
836 }
837
838 vtxo.validate(&funding_tx).with_context(|| format!(
840 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
841 )).map_err(HarkForfeitError::SentForfeits)?;
842 }
843
844 Ok(())
845}
846
847fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
848 match vtxo.validate(funding_tx) {
849 Err(VtxoValidationError::GenesisTransition {
850 genesis_idx, genesis_len, transition_kind, ..
851 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
852 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
853 vtxo.serialize_hex(),
854 )),
855 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
856 }
857}
858
859fn check_round_matches_participation(
860 part: &RoundParticipation,
861 new_vtxos: &[Vtxo<Full>],
862 funding_tx: &Transaction,
863) -> anyhow::Result<()> {
864 ensure!(new_vtxos.len() == part.outputs.len(),
865 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
866 );
867
868 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
869 ensure!(vtxo.amount() == req.amount,
870 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
871 );
872 ensure!(*vtxo.policy() == req.policy,
873 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
874 );
875
876 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
878 }
879
880 Ok(())
881}
882
883async fn check_funding_tx_confirmations(
893 wallet: &Wallet,
894 funding_txid: Txid,
895 funding_tx: &Transaction,
896) -> anyhow::Result<bool> {
897 let tip = wallet.inner.chain.tip().await.context("chain source error")?;
898 let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
899 let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
900 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
901 funding_txid, tx_status, tip,
902 );
903 match tx_status {
904 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
905 TxStatus::Mempool | TxStatus::Confirmed(_) => {
906 if wallet.inner.config.round_tx_required_confirmations == 0 {
907 debug!("Accepting round funding tx without confirmations because of configuration");
908 Ok(true)
909 } else {
910 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
911 Ok(false)
912 }
913 },
914 TxStatus::NotFound => {
915 if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
920 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
921 funding_txid, serialize_hex(funding_tx), e,
922 ))
923 } else {
924 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
925 Ok(false)
926 }
927 },
928 }
929}
930
931enum HarkProgressResult {
932 RoundPending,
933 RoundNotFound,
934 FundingTxUnconfirmed {
935 funding_txid: Txid,
936 },
937 Ok {
938 funding_tx: Transaction,
939 new_vtxos: Vec<Vtxo<Full>>,
940 },
941}
942
943async fn progress_non_interactive(
944 wallet: &Wallet,
945 participation: &RoundParticipation,
946 unlock_hash: UnlockHash,
947) -> Result<HarkProgressResult, HarkForfeitError> {
948 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
949
950 let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
951 unlock_hash: unlock_hash.to_byte_array().to_vec(),
952 }).await {
953 Ok(resp) => resp.into_inner(),
954 Err(err) if err.code() == tonic::Code::NotFound => {
955 return Ok(HarkProgressResult::RoundNotFound);
956 },
957 Err(err) => {
958 return Err(HarkForfeitError::Err(
959 anyhow::Error::from(err).context("error checking round participation status"),
960 ));
961 },
962 };
963 let status = protos::RoundParticipationStatus::try_from(resp.status)
964 .context("unknown status from server")
965 .map_err(HarkForfeitError::Err) ?;
966
967 if status == protos::RoundParticipationStatus::RoundPartPending {
968 trace!("Hark round still pending");
969 return Ok(HarkProgressResult::RoundPending);
970 }
971
972 if status == protos::RoundParticipationStatus::RoundPartReleased {
977 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
978 warn!("Server says preimage was already released for hArk participation \
979 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
980 );
981 }
982
983 let funding_tx_bytes = resp.round_funding_tx
984 .context("funding txid should be provided when status is not pending")
985 .map_err(HarkForfeitError::Err)?;
986 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
987 .context("invalid funding txid")
988 .map_err(HarkForfeitError::Err)?;
989 let funding_txid = funding_tx.compute_txid();
990 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
991 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
992 );
993
994 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
996 Ok(true) => {},
997 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
998 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
999 }
1000
1001 let mut new_vtxos = resp.output_vtxos.into_iter()
1002 .map(|v| <Vtxo<Full>>::deserialize(&v))
1003 .collect::<Result<Vec<_>, _>>()
1004 .context("invalid output VTXOs from server")
1005 .map_err(HarkForfeitError::Err)?;
1006
1007 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1009 .context("new VTXOs received from server don't match our participation")
1010 .map_err(HarkForfeitError::Err)?;
1011
1012 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1013 .context("error forfeiting hArk VTXOs")
1014 .map_err(HarkForfeitError::SentForfeits)?;
1015
1016 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1017}
1018
1019async fn progress_attempt(
1020 state: &AttemptState,
1021 wallet: &Wallet,
1022 part: &RoundParticipation,
1023 event: &RoundEvent,
1024) -> AttemptProgressResult {
1025 match (state, event) {
1029
1030 (
1031 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1032 RoundEvent::VtxoProposal(e),
1033 ) => {
1034 trace!("Received VtxoProposal: {:#?}", e);
1035 match sign_vtxo_tree(
1036 wallet,
1037 part,
1038 &cosign_keys,
1039 &secret_nonces,
1040 &e.unsigned_round_tx,
1041 &e.vtxos_spec,
1042 &e.cosign_agg_nonces,
1043 ).await {
1044 Ok(()) => {
1045 AttemptProgressResult::Updated {
1046 new_state: AttemptState::AwaitingFinishedRound {
1047 unsigned_round_tx: e.unsigned_round_tx.clone(),
1048 vtxos_spec: e.vtxos_spec.clone(),
1049 unlock_hash: *unlock_hash,
1050 },
1051 }
1052 },
1053 Err(e) => {
1054 trace!("Error signing VTXO tree: {:#}", e);
1055 AttemptProgressResult::Failed(e)
1056 },
1057 }
1058 },
1059
1060 (
1061 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1062 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1063 ) => {
1064 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1065 return AttemptProgressResult::Failed(anyhow!(
1066 "signed funding tx ({}) doesn't match tx received before ({})",
1067 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1068 ));
1069 }
1070
1071 if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1072 warn!("Failed to broadcast signed round tx: {:#}", e);
1073 }
1074
1075 match construct_new_vtxos(
1076 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1077 ).await {
1078 Ok(v) => AttemptProgressResult::Finished {
1079 funding_tx: signed_round_tx.clone(),
1080 vtxos: v,
1081 unlock_hash: *unlock_hash,
1082 },
1083 Err(e) => AttemptProgressResult::Failed(anyhow!(
1084 "failed to construct new VTXOs for round: {:#}", e,
1085 )),
1086 }
1087 },
1088
1089 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1090 AttemptProgressResult::Failed(anyhow!(
1091 "unexpectedly received a finished round while we were in state {}",
1092 state.kind(),
1093 ))
1094 },
1095
1096 (state, _) => {
1097 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1098 AttemptProgressResult::NotUpdated
1099 },
1100 }
1101}
1102
1103async fn sign_vtxo_tree(
1104 wallet: &Wallet,
1105 participation: &RoundParticipation,
1106 cosign_keys: &[Keypair],
1107 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1108 unsigned_round_tx: &Transaction,
1109 vtxo_tree: &VtxoTreeSpec,
1110 cosign_agg_nonces: &[musig::AggregatedNonce],
1111) -> anyhow::Result<()> {
1112 let (srv, _) = wallet.require_server().await.context("server not available")?;
1113
1114 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1115
1116 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1118 for vtxo_req in vtxo_tree.iter_vtxos() {
1119 if let Some(i) = my_vtxos.iter().position(|v| {
1120 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1121 }) {
1122 my_vtxos.swap_remove(i);
1123 }
1124 }
1125 if !my_vtxos.is_empty() {
1126 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1127 }
1128
1129 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1130 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1131 trace!("Sending vtxo signatures to server...");
1132 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1133 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1134 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1135 let part_sigs = unsigned_vtxos.cosign_branch(
1136 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1137 ).context("failed to cosign branch: our request not part of tree")?;
1138
1139 info!("Sending {} partial vtxo cosign signatures for pk {}",
1140 part_sigs.len(), key.public_key(),
1141 );
1142
1143 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1144 pubkey: key.public_key().serialize().to_vec(),
1145 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1146 }).await.context("error sending vtxo signatures")?;
1147
1148 Result::<(), anyhow::Error>::Ok(())
1149 })).await.context("error sending VTXO signatures")?;
1150 trace!("Done sending vtxo signatures to server");
1151
1152 Ok(())
1153}
1154
1155async fn construct_new_vtxos(
1156 participation: &RoundParticipation,
1157 unsigned_round_tx: &Transaction,
1158 vtxo_tree: &VtxoTreeSpec,
1159 vtxo_cosign_sigs: &[schnorr::Signature],
1160) -> anyhow::Result<Vec<Vtxo<Full>>> {
1161 let round_txid = unsigned_round_tx.compute_txid();
1162 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1163 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1164
1165 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1167 bail!("Received incorrect vtxo cosign signatures from server");
1169 }
1170
1171 let signed_vtxos = vtxo_tree
1172 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1173 .into_cached_tree();
1174
1175 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1176 let total_nb_expected_vtxos = expected_vtxos.len();
1177
1178 let mut new_vtxos = vec![];
1179 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1180 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1181 let vtxo = signed_vtxos.build_vtxo(idx);
1182
1183 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1186 .context("constructed invalid vtxo from tree")?;
1187
1188 info!("New VTXO from round: {} ({}, {})",
1189 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1190 );
1191
1192 new_vtxos.push(vtxo);
1193 expected_vtxos.swap_remove(expected_idx);
1194 }
1195 }
1196
1197 if !expected_vtxos.is_empty() {
1198 if expected_vtxos.len() == total_nb_expected_vtxos {
1199 bail!("None of our VTXOs were present in round!");
1201 } else {
1202 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1203 expected_vtxos.len(), expected_vtxos,
1204 );
1205 }
1206 }
1207 Ok(new_vtxos)
1208}
1209
1210async fn persist_round_success(
1212 wallet: &Wallet,
1213 participation: &RoundParticipation,
1214 movement_id: Option<MovementId>,
1215 new_vtxos: &[Vtxo<Full>],
1216 funding_tx: &Transaction,
1217) -> anyhow::Result<()> {
1218 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1219 new_vtxos.len(), movement_id,
1220 );
1221
1222 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1226 .context("failed to store new VTXOs");
1227 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1228 .context("failed to mark input VTXOs as spent");
1229 let update_result = if let Some(mid) = movement_id {
1230 wallet.inner.movements.finish_movement_with_update(
1231 mid,
1232 MovementStatus::Successful,
1233 MovementUpdate::new()
1234 .produced_vtxos(new_vtxos)
1235 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1236 ).await.context("failed to mark movement as finished")
1237 } else {
1238 Ok(())
1239 };
1240
1241 store_result?;
1242 spent_result?;
1243 update_result?;
1244
1245 Ok(())
1246}
1247
1248async fn persist_round_failure(
1249 wallet: &Wallet,
1250 participation: &RoundParticipation,
1251 movement_id: Option<MovementId>,
1252) -> anyhow::Result<()> {
1253 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1254 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1255 let finish_result = if let Some(movement_id) = movement_id {
1256 wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1257 } else {
1258 Ok(())
1259 };
1260 if let Err(e) = &finish_result {
1261 error!("Failed to mark movement as failed: {:#}", e);
1262 }
1263 match (unlock_result, finish_result) {
1264 (Ok(()), Ok(())) => Ok(()),
1265 (Err(e), _) => Err(e),
1266 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1267 }
1268}
1269
1270async fn update_funding_txid(
1271 wallet: &Wallet,
1272 movement_id: MovementId,
1273 funding_txid: Txid,
1274) -> anyhow::Result<()> {
1275 wallet.inner.movements.update_movement(
1276 movement_id,
1277 MovementUpdate::new()
1278 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1279 ).await.context("Unable to update funding txid of round")
1280}
1281
1282impl Wallet {
1283 pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1288 let guard = self.inner.lock_manager.lock(
1289 &format!("{}.round.{}", self.fingerprint(), id),
1290 ROUND_LOCK_TIMEOUT,
1291 ).await.with_context(|| format!(
1292 "timed out waiting for lock on round state {} (wallet {})",
1293 id, self.fingerprint(),
1294 ))?;
1295
1296 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1297 return Ok(Some(state.lock(guard)));
1298 }
1299
1300 Ok(None)
1301 }
1302
1303 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1305 let (mut srv, _) = self.require_server().await?;
1306 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1307 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1308 }
1309
1310 pub async fn join_next_round(
1319 &self,
1320 participation: RoundParticipation,
1321 movement_kind: Option<RoundMovement>,
1322 ) -> anyhow::Result<StoredRoundState> {
1323 let movement_id = if let Some(kind) = movement_kind {
1324 Some(self.inner.movements.new_movement_with_update(
1325 Subsystem::ROUND,
1326 kind.to_string(),
1327 participation.to_movement_update()?
1328 ).await?)
1329 } else {
1330 None
1331 };
1332 let state = RoundState::new_interactive(participation, movement_id);
1333
1334 let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1335 let state = self.lock_wait_round_state(id).await?
1336 .context("failed to lock fresh round state")?;
1337
1338 Ok(state)
1339 }
1340
1341 pub async fn join_next_round_delegated(
1343 &self,
1344 participation: RoundParticipation,
1345 movement_kind: Option<RoundMovement>,
1346 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1347 let (mut srv, _) = self.require_server().await?;
1348
1349 let movement_id = if let Some(kind) = movement_kind {
1350 Some(self.inner.movements.new_movement_with_update(
1351 Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1352 ).await?)
1353 } else {
1354 None
1355 };
1356
1357 let unblinded_mailbox_id = self.mailbox_identifier();
1359
1360 self.register_vtxo_transactions_with_server(&participation.inputs).await
1362 .context("failed to register input vtxo transactions with server")?;
1363
1364 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1366 for vtxo in participation.inputs.iter() {
1367 let keypair = self.get_vtxo_key(vtxo).await
1368 .context("failed to get vtxo keypair")?;
1369 input_vtxos.push(protos::InputVtxo {
1370 vtxo_id: vtxo.id().to_bytes().to_vec(),
1371 attestation: {
1372 let attestation = DelegatedRoundParticipationAttestation::new(
1373 vtxo.id(), &participation.outputs, &keypair,
1374 );
1375 attestation.serialize()
1376 },
1377 });
1378 }
1379
1380 let vtxo_requests = participation.outputs.iter()
1382 .map(|req|
1383 protos::VtxoRequest {
1384 policy: req.policy.serialize(),
1385 amount: req.amount.to_sat(),
1386 })
1387 .collect::<Vec<_>>();
1388
1389 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1391 input_vtxos,
1392 vtxo_requests,
1393 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1394 }).await.context("error submitting round participation to server")?.into_inner();
1395
1396 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1397 .context("invalid unlock hash from server")?;
1398
1399 let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1400
1401 info!("Non-interactive round participation submitted, it will automatically execute \
1402 when you next sync your wallet after the round happened \
1403 (and has sufficient confirmations).",
1404 );
1405
1406 let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1407 Ok(StoredRoundState::new(id, state))
1408 }
1409
1410 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1412 self.inner.db.get_pending_round_state_ids().await
1413 }
1414
1415 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1417 let ids = self.inner.db.get_pending_round_state_ids().await?;
1418 let mut states = Vec::with_capacity(ids.len());
1419 for id in ids {
1420 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1421 states.push(state);
1422 }
1423 }
1424 Ok(states)
1425 }
1426
1427 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1429 let mut ret = Amount::ZERO;
1430 for round in self.pending_round_states().await? {
1431 ret += round.state().pending_balance();
1432 }
1433 Ok(ret)
1434 }
1435
1436 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1441 let mut ret = Vec::new();
1442 for round in self.pending_round_states().await? {
1443 let inputs = round.state().locked_pending_inputs();
1444 ret.reserve(inputs.len());
1445 for input in inputs {
1446 let v = self.get_vtxo_by_id(input.id()).await
1447 .context("unknown round input VTXO")?;
1448 ret.push(v);
1449 }
1450 }
1451 Ok(ret)
1452 }
1453
1454 pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1456 let states = self.pending_round_states().await?;
1457 if states.is_empty() {
1458 return Ok(HashMap::new());
1459 }
1460
1461 debug!("Syncing {} pending round states...", states.len());
1462
1463 let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1464 tokio_stream::iter(states).for_each_concurrent(10, |state| {
1465 let ret = ret.clone();
1466 async move {
1467 if state.state().ongoing_participation() {
1469 return;
1470 }
1471
1472 let mut state = match self.lock_wait_round_state(state.id()).await {
1473 Ok(Some(state)) => state,
1474 Ok(None) => return,
1475 Err(e) => {
1476 warn!("Error locking round state: {:#}", e);
1477 return;
1478 },
1479 };
1480
1481 let status = match state.state_mut().sync(self).await {
1482 Ok(s) => s,
1483 Err(e) => {
1484 warn!("Error syncing round: {:#}", e);
1485 return;
1486 },
1487 };
1488 trace!("Synced round #{}, status: {:?}", state.id(), status);
1489 match status {
1490 RoundStatus::Confirmed { funding_txid } => {
1491 info!("Round confirmed. Funding tx {}", funding_txid);
1492 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1493 warn!("Error removing confirmed round state from db: {:#}", e);
1494 }
1495 },
1496 RoundStatus::Unconfirmed { funding_txid } => {
1497 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1498 if let Err(e) = self.inner.db.update_round_state(&state).await {
1499 warn!("Error updating pending round state in db: {:#}", e);
1500 }
1501 },
1502 RoundStatus::Pending => {
1503 if let Err(e) = self.inner.db.update_round_state(&state).await {
1504 warn!("Error updating pending round state in db: {:#}", e);
1505 }
1506 },
1507 RoundStatus::Failed { ref error } => {
1508 error!("Round failed: {}", error);
1509 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1510 warn!("Error removing failed round state from db: {:#}", e);
1511 }
1512 },
1513 RoundStatus::Canceled => {
1514 error!("Round canceled");
1515 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1516 warn!("Error removing canceled round state from db: {:#}", e);
1517 }
1518 },
1519 }
1520 ret.lock().insert(state.id(), status);
1521 }
1522 }).await;
1523
1524 Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1525 }
1526
1527 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1529 let (mut srv, _) = self.require_server().await?;
1530 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1531 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1532 }
1533
1534 async fn inner_process_event(
1535 &self,
1536 state: &mut StoredRoundState,
1537 event: Option<&RoundEvent>,
1538 ) {
1539 if let Some(event) = event && state.state().ongoing_participation() {
1540 let updated = state.state_mut().process_event(self, &event).await;
1541 if updated {
1542 if let Err(e) = self.inner.db.update_round_state(&state).await {
1543 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1544 }
1545 }
1546 }
1547
1548 match state.state_mut().sync(self).await {
1549 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1550 Ok(s) if s.is_final() => {
1551 info!("Round #{} finished with result: {:?}", state.id(), s);
1552 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1553 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1554 }
1555 },
1556 Ok(s) => {
1557 trace!("Round state #{} is now in state {:?}", state.id(), s);
1558 if let Err(e) = self.inner.db.update_round_state(&state).await {
1559 warn!("Error storing round state #{}: {:#}", state.id(), e);
1560 }
1561 },
1562 }
1563 }
1564
1565 pub async fn progress_pending_rounds(
1570 &self,
1571 last_round_event: Option<&RoundEvent>,
1572 ) -> anyhow::Result<()> {
1573 let states = self.pending_round_states().await?;
1574 if states.is_empty() {
1575 return Ok(());
1576 }
1577
1578 info!("Processing {} rounds...", states.len());
1579
1580 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1581
1582 let has_ongoing_participation = states.iter()
1583 .any(|s| s.state().ongoing_participation());
1584 if has_ongoing_participation && last_round_event.is_none() {
1585 match self.get_last_round_event().await {
1586 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1587 Err(e) => {
1588 warn!("Error fetching round event, \
1589 failed to progress ongoing rounds: {:#}", e);
1590 },
1591 }
1592 }
1593
1594 let event = last_round_event.as_ref().map(|c| c.as_ref());
1595
1596 let futs = states.into_iter().map(async |state| {
1597 let locked = self.lock_wait_round_state(state.id()).await?;
1598 if let Some(mut locked) = locked {
1599 self.inner_process_event(&mut locked, event).await;
1600 }
1601 Ok::<_, anyhow::Error>(())
1602 });
1603
1604 futures::future::join_all(futs).await;
1605
1606 Ok(())
1607 }
1608
1609 pub async fn subscribe_round_events(&self)
1610 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1611 {
1612 let (mut srv, _) = self.require_server().await?;
1613 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1614 .into_inner().map(|m| {
1615 let m = m.context("received error on event stream")?;
1616 let e = RoundEvent::try_from(m.clone())
1617 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1618 trace!("Received round event: {}", e);
1619 Ok::<_, anyhow::Error>(e)
1620 });
1621 Ok(events)
1622 }
1623
1624 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1629 let mut events = self.subscribe_round_events().await?;
1630
1631 loop {
1632 let state_ids = self.pending_round_states().await?.iter()
1635 .filter(|s| s.state().ongoing_participation())
1636 .map(|s| s.id())
1637 .collect::<Vec<_>>();
1638
1639 if state_ids.is_empty() {
1640 info!("All rounds handled");
1641 return Ok(());
1642 }
1643
1644 let event = events.next().await
1645 .context("events stream broke")?
1646 .context("error on event stream")?;
1647
1648 let futs = state_ids.into_iter().map(async |state| {
1649 let locked = self.lock_wait_round_state(state).await?;
1650 if let Some(mut locked) = locked {
1651 self.inner_process_event(&mut locked, Some(&event)).await;
1652 }
1653 Ok::<_, anyhow::Error>(())
1654 });
1655
1656 futures::future::join_all(futs).await;
1657 }
1658 }
1659
1660 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1665 let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1667
1668 let futures = state_ids.into_iter().map(|state_id| {
1669 async move {
1670 let mut state = match self.lock_wait_round_state(state_id).await {
1672 Ok(Some(s)) => s,
1673 Ok(None) => return,
1674 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1675 };
1676
1677 match state.state_mut().try_cancel(self).await {
1678 Ok(true) => {
1679 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1680 warn!("Error removing canceled round state from db: {:#}", e);
1681 }
1682 },
1683 Ok(false) => {},
1684 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1685 }
1686 }
1687 });
1688
1689 join_all(futures).await;
1690
1691 Ok(())
1692 }
1693
1694 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1696 let mut state = self.lock_wait_round_state(id).await?
1697 .context("round state not found")?;
1698
1699 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1700 self.inner.db.remove_round_state(&state).await
1701 .context("error removing canceled round state from db")?;
1702 } else {
1703 bail!("failed to cancel round");
1704 }
1705
1706 Ok(())
1707 }
1708
1709 pub(crate) async fn participate_round(
1716 &self,
1717 participation: RoundParticipation,
1718 movement_kind: Option<RoundMovement>,
1719 ) -> anyhow::Result<RoundStatus> {
1720 let mut state = self.join_next_round(participation, movement_kind).await?;
1721
1722 info!("Waiting for a round start...");
1723 let mut events = self.subscribe_round_events().await?;
1724
1725 loop {
1726 if !state.state().ongoing_participation() {
1727 let status = state.state_mut().sync(self).await?;
1728 match status {
1729 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1730 RoundStatus::Canceled => bail!("round canceled"),
1731 status => return Ok(status),
1732 }
1733 }
1734
1735 let event = events.next().await
1736 .context("events stream broke")?
1737 .context("error on event stream")?;
1738 if state.state_mut().process_event(self, &event).await {
1739 self.inner.db.update_round_state(&state).await?;
1740 }
1741 }
1742 }
1743}