1use std::collections::HashMap;
6use std::iter;
7use std::borrow::Cow;
8use std::convert::Infallible;
9use std::sync::Arc;
10use std::time::{Duration, SystemTime, UNIX_EPOCH};
11
12use anyhow::Context;
13use ark::vtxo::VtxoValidationError;
14use bdk_esplora::esplora_client::Amount;
15use bip39::rand;
16use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
17use bitcoin::consensus::encode::{deserialize, serialize_hex};
18use bitcoin::hashes::Hash;
19use bitcoin::hex::DisplayHex;
20use bitcoin::key::Keypair;
21use bitcoin::secp256k1::schnorr;
22use futures::future::{join_all, try_join_all};
23use futures::{Stream, StreamExt};
24use log::{debug, error, info, trace, warn};
25
26use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
27use ark::vtxo::Full;
28use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
29use ark::forfeit::HashLockedForfeitBundle;
30use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
31use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
32use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
33use bitcoin_ext::TxStatus;
34use server_rpc::{protos, ServerConnection, TryFromBytes};
35
36use crate::movement::manager::OnDropStatus;
37use crate::{SECP, Wallet, WalletVtxo};
38use crate::movement::{MovementId, MovementStatus};
39use crate::movement::update::MovementUpdate;
40use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
41
42const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
45use crate::subsystem::{RoundMovement, Subsystem};
46
47
48const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct RoundParticipation {
54 #[serde(with = "ark::encode::serde::vec")]
55 pub inputs: Vec<Vtxo<Full>>,
56 pub outputs: Vec<VtxoRequest>,
59 #[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
61 pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
62}
63
64impl RoundParticipation {
65 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
66 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
67 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
68 let fee = input_amount - output_amount;
69 Ok(MovementUpdate::new()
70 .consumed_vtxos(&self.inputs)
71 .intended_balance(SignedAmount::ZERO)
72 .effective_balance( - fee.to_signed()?)
73 .fee(fee)
74 )
75 }
76}
77
78#[derive(Debug, Clone)]
79pub enum RoundStatus {
80 Confirmed {
82 funding_txid: Txid,
83 },
84 Unconfirmed {
86 funding_txid: Txid,
87 },
88 Pending,
90 Failed {
92 error: String,
93 },
94 Canceled,
96}
97
98impl RoundStatus {
99 pub fn is_final(&self) -> bool {
101 match self {
102 Self::Confirmed { .. } => true,
103 Self::Unconfirmed { .. } => false,
104 Self::Pending => false,
105 Self::Failed { .. } => true,
106 Self::Canceled => true,
107 }
108 }
109
110 pub fn is_success(&self) -> bool {
112 match self {
113 Self::Confirmed { .. } => true,
114 Self::Unconfirmed { .. } => true,
115 Self::Pending => false,
116 Self::Failed { .. } => false,
117 Self::Canceled => false,
118 }
119 }
120}
121
122pub struct RoundState {
135 pub(crate) done: bool,
137
138 pub(crate) participation: RoundParticipation,
140
141 pub(crate) flow: RoundFlowState,
143
144 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
149
150 pub(crate) sent_forfeit_sigs: bool,
157
158 pub(crate) movement_id: Option<MovementId>,
160}
161
162impl RoundState {
163 fn new_interactive(
164 participation: RoundParticipation,
165 movement_id: Option<MovementId>,
166 ) -> Self {
167 Self {
168 participation,
169 movement_id,
170 flow: RoundFlowState::InteractivePending,
171 new_vtxos: Vec::new(),
172 sent_forfeit_sigs: false,
173 done: false,
174 }
175 }
176
177 fn new_delegated(
178 participation: RoundParticipation,
179 unlock_hash: UnlockHash,
180 movement_id: Option<MovementId>,
181 ) -> Self {
182 Self {
183 participation,
184 movement_id,
185 flow: RoundFlowState::NonInteractivePending { unlock_hash },
186 new_vtxos: Vec::new(),
187 sent_forfeit_sigs: false,
188 done: false,
189 }
190 }
191
192 pub fn participation(&self) -> &RoundParticipation {
194 &self.participation
195 }
196
197 pub fn unlock_hash(&self) -> Option<UnlockHash> {
199 match self.flow {
200 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
201 RoundFlowState::InteractivePending => None,
202 RoundFlowState::InteractiveOngoing { .. } => None,
203 RoundFlowState::Failed { .. } => None,
204 RoundFlowState::Canceled => None,
205 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
206 }
207 }
208
209 pub fn funding_tx(&self) -> Option<&Transaction> {
210 match self.flow {
211 RoundFlowState::NonInteractivePending { .. } => None,
212 RoundFlowState::InteractivePending => None,
213 RoundFlowState::InteractiveOngoing { .. } => None,
214 RoundFlowState::Failed { .. } => None,
215 RoundFlowState::Canceled => None,
216 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
217 }
218 }
219
220 pub fn ongoing_participation(&self) -> bool {
222 match self.flow {
223 RoundFlowState::NonInteractivePending { .. } => false,
224 RoundFlowState::InteractivePending => true,
225 RoundFlowState::InteractiveOngoing { .. } => true,
226 RoundFlowState::Failed { .. } => false,
227 RoundFlowState::Canceled => false,
228 RoundFlowState::Finished { .. } => false,
229 }
230 }
231
232 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
235 let ret = match self.flow {
236 RoundFlowState::NonInteractivePending { .. } => {
237 bail!("it is currently not yet possible to cancel pending delegated rounds");
239 },
240 RoundFlowState::Canceled => true,
241 RoundFlowState::Failed { .. } => true,
242 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
243 self.flow = RoundFlowState::Canceled;
244 true
245 },
246 RoundFlowState::Finished { .. } => false,
247 };
248 if ret {
249 persist_round_failure(wallet, &self.participation, self.movement_id).await
250 .context("failed to persist round failure for cancelation")?;
251 }
252 Ok(ret)
253 }
254
255 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
256 match start_attempt(wallet, &self.participation, attempt).await {
257 Ok(state) => {
258 self.flow = RoundFlowState::InteractiveOngoing {
259 round_seq: attempt.round_seq,
260 attempt_seq: attempt.attempt_seq,
261 state: state,
262 };
263 },
264 Err(e) => {
265 self.flow = RoundFlowState::Failed {
266 error: format!("{:#}", e),
267 };
268 },
269 }
270 }
271
272 pub async fn process_event(
274 &mut self,
275 wallet: &Wallet,
276 event: &RoundEvent,
277 ) -> bool {
278 let _: Infallible = match self.flow {
279 RoundFlowState::InteractivePending => {
280 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
281 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
282 self.try_start_attempt(wallet, e).await;
283 return true;
284 } else {
285 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
286 event.kind(), event.round_seq(), event.attempt_seq(),
287 );
288 return false;
289 }
290 },
291 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
292 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
295 warn!("Round {} failed by server", round_seq);
296 self.flow = RoundFlowState::Failed {
297 error: format!("round {} failed by server", round_seq),
298 };
299 return true;
300 }
301
302 if event.round_seq() > round_seq {
303 self.flow = RoundFlowState::Failed {
306 error: format!("round {} started while we were on {}",
307 event.round_seq(), round_seq,
308 ),
309 };
310 return true;
311 }
312
313 if event.attempt_seq() < attempt_seq {
314 trace!("ignoring replayed message from old attempt");
315 return false;
316 }
317
318 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
319 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
320 self.try_start_attempt(wallet, e).await;
321 return true;
322 }
323 trace!("Processing event {} for round attempt {}:{} in state {}",
324 event.kind(), round_seq, attempt_seq, state.kind(),
325 );
326
327 return match progress_attempt(state, wallet, &self.participation, event).await {
328 AttemptProgressResult::NotUpdated => false,
329 AttemptProgressResult::Updated { new_state } => {
330 *state = new_state;
331 true
332 },
333 AttemptProgressResult::Failed(e) => {
334 warn!("Round failed with error: {:#}", e);
335 self.flow = RoundFlowState::Failed {
336 error: format!("{:#}", e),
337 };
338 true
339 },
340 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
341 self.new_vtxos = vtxos;
342 let funding_txid = funding_tx.compute_txid();
343 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
344 if let Some(mid) = self.movement_id {
345 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
346 warn!("Error updating the round funding txid: {:#}", e);
347 }
348 }
349 true
350 },
351 };
352 },
353 RoundFlowState::NonInteractivePending { .. }
354 | RoundFlowState::Finished { .. }
355 | RoundFlowState::Failed { .. }
356 | RoundFlowState::Canceled => return false,
357 };
358 }
359
360 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
365 match self.flow {
366 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
367 Ok(RoundStatus::Confirmed {
368 funding_txid: funding_tx.compute_txid(),
369 })
370 },
371
372 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
373 Ok(RoundStatus::Pending)
374 },
375 RoundFlowState::Failed { ref error } => {
376 persist_round_failure(wallet, &self.participation, self.movement_id).await
377 .context("failed to persist round failure")?;
378 Ok(RoundStatus::Failed { error: error.clone() })
379 },
380 RoundFlowState::Canceled => {
381 persist_round_failure(wallet, &self.participation, self.movement_id).await
382 .context("failed to persist round failure")?;
383 Ok(RoundStatus::Canceled)
384 },
385
386 RoundFlowState::NonInteractivePending { unlock_hash } => {
387 match progress_delegated(wallet, &self.participation, unlock_hash).await {
388 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
389 Ok(HarkProgressResult::RoundNotFound) => {
390 self.handle_round_not_found(wallet).await
391 },
392 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
393 let funding_txid = funding_tx.compute_txid();
394 self.new_vtxos = new_vtxos;
395 self.flow = RoundFlowState::Finished {
396 funding_tx: funding_tx.clone(),
397 unlock_hash: unlock_hash,
398 };
399
400 persist_round_success(
401 wallet,
402 &self.participation,
403 self.movement_id,
404 &self.new_vtxos,
405 &funding_tx,
406 ).await.context("failed to store successful round in DB!")?;
407
408 self.done = true;
409
410 Ok(RoundStatus::Confirmed { funding_txid })
411 },
412 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
413 if let Some(mid) = self.movement_id {
414 update_funding_txid(wallet, mid, funding_txid).await
415 .context("failed to update funding txid in DB")?;
416 }
417 Ok(RoundStatus::Unconfirmed { funding_txid })
418 },
419
420 Err(HarkForfeitError::Err(e)) => {
423 Err(e.context("error progressing delegated round"))
427 },
428 Err(HarkForfeitError::SentForfeits(e)) => {
429 self.sent_forfeit_sigs = true;
430 Err(e.context("error progressing delegated round \
431 after sending forfeit tx signatures"))
432 },
433 }
434 },
435 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
437 let funding_txid = funding_tx.compute_txid();
438 let confirmed = check_funding_tx_confirmations(
439 wallet, funding_txid, &funding_tx,
440 ).await.context("error checking funding tx confirmations")?;
441 if !confirmed {
442 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
443 return Ok(RoundStatus::Unconfirmed { funding_txid });
444 }
445
446 match hark_vtxo_swap(
447 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
448 ).await {
449 Ok(()) => {
450 persist_round_success(
451 wallet,
452 &self.participation,
453 self.movement_id,
454 &self.new_vtxos,
455 &funding_tx,
456 ).await.context("failed to store successful round in DB!")?;
457
458 self.done = true;
459
460 Ok(RoundStatus::Confirmed { funding_txid })
461 },
462 Err(HarkForfeitError::Err(e)) => {
463 Err(e.context("error forfeiting VTXOs after round"))
464 },
465 Err(HarkForfeitError::SentForfeits(e)) => {
466 self.sent_forfeit_sigs = true;
467 Err(e.context("error after having signed and sent \
468 forfeit signatures to server"))
469 },
470 }
471 },
472 }
473 }
474
475 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
478 if self.new_vtxos.is_empty() {
479 None
480 } else {
481 Some(&self.new_vtxos)
482 }
483 }
484
485 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
488 match self.flow {
490 RoundFlowState::NonInteractivePending { .. }
491 | RoundFlowState::InteractivePending
492 | RoundFlowState::InteractiveOngoing { .. }
493 => {
494 &self.participation.inputs
495 },
496 RoundFlowState::Finished { .. } => if self.done {
497 &[]
499 } else {
500 &self.participation.inputs
501 },
502 RoundFlowState::Failed { .. }
503 | RoundFlowState::Canceled
504 => {
505 &[]
507 },
508 }
509 }
510
511 pub fn pending_balance(&self) -> Amount {
515 if self.done {
516 return Amount::ZERO;
517 }
518
519 match self.flow {
520 RoundFlowState::NonInteractivePending { .. }
521 | RoundFlowState::InteractivePending
522 | RoundFlowState::InteractiveOngoing { .. }
523 | RoundFlowState::Finished { .. }
524 => {
525 self.participation.outputs.iter().map(|o| o.amount).sum()
526 },
527 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
528 Amount::ZERO
529 },
530 }
531 }
532
533 async fn handle_round_not_found(
540 &mut self,
541 wallet: &Wallet,
542 ) -> anyhow::Result<RoundStatus> {
543 info!("Server reports round participation not found (no forfeits sent)");
544 self.flow = RoundFlowState::Failed {
545 error: "server reports round participation not found".into(),
546 };
547 persist_round_failure(wallet, &self.participation, self.movement_id).await
548 .context("failed to persist round failure")?;
549
550 Ok(RoundStatus::Failed {
551 error: "server reports round participation not found".into(),
552 })
553 }
554}
555
556pub enum RoundFlowState {
561 NonInteractivePending {
563 unlock_hash: UnlockHash,
564 },
565
566 InteractivePending,
568 InteractiveOngoing {
570 round_seq: RoundSeq,
571 attempt_seq: usize,
572 state: AttemptState,
573 },
574
575 Finished {
577 funding_tx: Transaction,
578 unlock_hash: UnlockHash,
579 },
580
581 Failed {
583 error: String,
584 },
585
586 Canceled,
588}
589
590pub enum AttemptState {
595 AwaitingAttempt,
596 AwaitingUnsignedVtxoTree {
597 cosign_keys: Vec<Keypair>,
598 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
599 unlock_hash: UnlockHash,
600 },
601 AwaitingFinishedRound {
602 unsigned_round_tx: Transaction,
603 vtxos_spec: VtxoTreeSpec,
604 unlock_hash: UnlockHash,
605 },
606}
607
608impl AttemptState {
609 fn kind(&self) -> &'static str {
611 match self {
612 Self::AwaitingAttempt => "AwaitingAttempt",
613 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
614 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
615 }
616 }
617}
618
619enum AttemptProgressResult {
621 Finished {
622 funding_tx: Transaction,
623 vtxos: Vec<Vtxo<Full>>,
624 unlock_hash: UnlockHash,
625 },
626 Failed(anyhow::Error),
627 Updated {
633 new_state: AttemptState,
634 },
635 NotUpdated,
636}
637
638async fn start_attempt(
640 wallet: &Wallet,
641 participation: &RoundParticipation,
642 event: &RoundAttempt,
643) -> anyhow::Result<AttemptState> {
644 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
645
646 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
648 .take(participation.outputs.len())
649 .collect::<Vec<_>>();
650
651 let cosign_nonces = cosign_keys.iter()
654 .map(|key| {
655 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
656 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
657 for _ in 0..ark_info.nb_round_nonces {
658 let (s, p) = musig::nonce_pair(key);
659 secs.push(s);
660 pubs.push(p);
661 }
662 (secs, pubs)
663 })
664 .take(participation.outputs.len())
665 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
666
667
668 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
670 participation.inputs.len(), participation.outputs.len(),
671 );
672
673 let unblinded_mailbox_id = wallet.mailbox_identifier();
675 let signed_reqs = participation.outputs.iter()
676 .zip(cosign_keys.iter())
677 .zip(cosign_nonces.iter())
678 .map(|((req, cosign_key), (_sec, pub_nonces))| {
679 SignedVtxoRequest {
680 vtxo: req.clone(),
681 cosign_pubkey: cosign_key.public_key(),
682 nonces: pub_nonces.clone(),
683 }
684 })
685 .collect::<Vec<_>>();
686
687 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
688 for vtxo in participation.inputs.iter() {
689 let keypair = wallet.get_vtxo_key(vtxo).await
690 .map_err(HarkForfeitError::Err)?;
691 input_vtxos.push(protos::InputVtxo {
692 vtxo_id: vtxo.id().to_bytes().to_vec(),
693 attestation: {
694 let attestation = RoundAttemptAttestation::new(
695 event.challenge, vtxo.id(), &signed_reqs, &keypair,
696 );
697 attestation.serialize()
698 },
699 });
700 }
701
702 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
704 .map_err(HarkForfeitError::Err)?;
705
706 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
707 input_vtxos: input_vtxos,
708 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
709 #[allow(deprecated)]
710 offboard_requests: vec![],
711 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
712 }).await.context("Ark server refused our payment submission")?;
713
714 Ok(AttemptState::AwaitingUnsignedVtxoTree {
715 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
716 cosign_keys: cosign_keys,
717 secret_nonces: cosign_nonces.into_iter()
718 .map(|(sec, _pub)| sec.into_iter()
719 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
720 .collect())
721 .collect(),
722 })
723}
724
725#[derive(Debug, thiserror::Error)]
727enum HarkForfeitError {
728 #[error("error after forfeits were sent")]
730 SentForfeits(#[source] anyhow::Error),
731 #[error("error before forfeits were sent")]
733 Err(#[source] anyhow::Error),
734}
735
736async fn hark_cosign_leaf(
737 wallet: &Wallet,
738 srv: &mut ServerConnection,
739 funding_tx: &Transaction,
740 vtxo: &mut Vtxo<Full>,
741) -> anyhow::Result<()> {
742 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
743 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
744 .with_context(|| format!(
745 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
746 ))?.1;
747 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
748 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
749 protos::LeafVtxoCosignRequest::from(cosign_req),
750 ).await
751 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
752 .into_inner().try_into()
753 .context("bad leaf vtxo cosign response")?;
754 ensure!(ctx.finalize(vtxo, cosign_resp),
755 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
756 );
757
758 Ok(())
759}
760
761async fn hark_vtxo_swap(
771 wallet: &Wallet,
772 participation: &RoundParticipation,
773 output_vtxos: &mut [Vtxo<Full>],
774 funding_tx: &Transaction,
775 unlock_hash: UnlockHash,
776) -> Result<(), HarkForfeitError> {
777 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
778
779 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
781 .context("couldn't send our input vtxo transactions to server")
782 .map_err(HarkForfeitError::Err)?;
783
784 for vtxo in output_vtxos.iter_mut() {
786 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
787 .map_err(HarkForfeitError::Err)?;
788 }
789
790 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
793 unlock_hash: unlock_hash.to_byte_array().to_vec(),
794 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
795 }).await
796 .context("request forfeits nonces call failed")
797 .map_err(HarkForfeitError::Err)?
798 .into_inner().public_nonces.into_iter()
799 .map(|b| musig::PublicNonce::from_bytes(b))
800 .collect::<Result<Vec<_>, _>>()
801 .context("invalid forfeit nonces")
802 .map_err(HarkForfeitError::Err)?;
803
804 if server_nonces.len() != participation.inputs.len() {
805 return Err(HarkForfeitError::Err(anyhow!(
806 "server sent {} nonce pairs, expected {}",
807 server_nonces.len(), participation.inputs.len(),
808 )));
809 }
810
811 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
812 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
813 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
814 .ok().flatten().with_context(|| format!(
815 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
816 )).map_err(HarkForfeitError::Err)?.1;
817 forfeit_bundles.push(HashLockedForfeitBundle::new(
818 input, unlock_hash, &user_key, &nonces,
819 ))
820 }
821
822 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
823 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
824 }).await
825 .context("forfeit vtxos call failed")
826 .map_err(HarkForfeitError::SentForfeits)?
827 .into_inner().unlock_preimage.as_slice().try_into()
828 .context("invalid preimage length")
829 .map_err(HarkForfeitError::SentForfeits)?;
830
831 for vtxo in output_vtxos.iter_mut() {
832 if !vtxo.provide_unlock_preimage(preimage) {
833 return Err(HarkForfeitError::SentForfeits(anyhow!(
834 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
835 preimage.as_hex(), vtxo.id(), unlock_hash,
836 )));
837 }
838
839 vtxo.validate(&funding_tx).with_context(|| format!(
841 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
842 )).map_err(HarkForfeitError::SentForfeits)?;
843 }
844
845 Ok(())
846}
847
848fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
849 match vtxo.validate(funding_tx) {
850 Err(VtxoValidationError::GenesisTransition {
851 genesis_idx, genesis_len, transition_kind, ..
852 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
853 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
854 vtxo.serialize_hex(),
855 )),
856 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
857 }
858}
859
860fn check_round_matches_participation(
861 part: &RoundParticipation,
862 new_vtxos: &[Vtxo<Full>],
863 funding_tx: &Transaction,
864) -> anyhow::Result<()> {
865 ensure!(new_vtxos.len() == part.outputs.len(),
866 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
867 );
868
869 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
870 ensure!(vtxo.amount() == req.amount,
871 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
872 );
873 ensure!(*vtxo.policy() == req.policy,
874 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
875 );
876
877 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
879 }
880
881 Ok(())
882}
883
884async fn check_funding_tx_confirmations(
894 wallet: &Wallet,
895 funding_txid: Txid,
896 funding_tx: &Transaction,
897) -> anyhow::Result<bool> {
898 let tip = wallet.inner.chain.tip().await.context("chain source error")?;
899 let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
900 let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
901 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
902 funding_txid, tx_status, tip,
903 );
904 match tx_status {
905 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
906 TxStatus::Mempool | TxStatus::Confirmed(_) => {
907 if wallet.inner.config.round_tx_required_confirmations == 0 {
908 debug!("Accepting round funding tx without confirmations because of configuration");
909 Ok(true)
910 } else {
911 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
912 Ok(false)
913 }
914 },
915 TxStatus::NotFound => {
916 if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
921 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
922 funding_txid, serialize_hex(funding_tx), e,
923 ))
924 } else {
925 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
926 Ok(false)
927 }
928 },
929 }
930}
931
932enum HarkProgressResult {
933 RoundPending,
934 RoundNotFound,
935 FundingTxUnconfirmed {
936 funding_txid: Txid,
937 },
938 Ok {
939 funding_tx: Transaction,
940 new_vtxos: Vec<Vtxo<Full>>,
941 },
942}
943
944async fn progress_delegated(
945 wallet: &Wallet,
946 participation: &RoundParticipation,
947 unlock_hash: UnlockHash,
948) -> Result<HarkProgressResult, HarkForfeitError> {
949 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
950
951 let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
952 unlock_hash: unlock_hash.to_byte_array().to_vec(),
953 }).await {
954 Ok(resp) => resp.into_inner(),
955 Err(err) if err.code() == tonic::Code::NotFound => {
956 return Ok(HarkProgressResult::RoundNotFound);
957 },
958 Err(err) => {
959 return Err(HarkForfeitError::Err(
960 anyhow::Error::from(err).context("error checking round participation status"),
961 ));
962 },
963 };
964 let status = protos::RoundParticipationStatus::try_from(resp.status)
965 .context("unknown status from server")
966 .map_err(HarkForfeitError::Err) ?;
967
968 if status == protos::RoundParticipationStatus::RoundPartPending {
969 trace!("Hark round still pending");
970 return Ok(HarkProgressResult::RoundPending);
971 }
972
973 if status == protos::RoundParticipationStatus::RoundPartReleased {
978 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
979 warn!("Server says preimage was already released for hArk participation \
980 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
981 );
982 }
983
984 let funding_tx_bytes = resp.round_funding_tx
985 .context("funding txid should be provided when status is not pending")
986 .map_err(HarkForfeitError::Err)?;
987 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
988 .context("invalid funding txid")
989 .map_err(HarkForfeitError::Err)?;
990 let funding_txid = funding_tx.compute_txid();
991 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
992 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
993 );
994
995 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
997 Ok(true) => {},
998 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
999 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
1000 }
1001
1002 let mut new_vtxos = resp.output_vtxos.into_iter()
1003 .map(|v| <Vtxo<Full>>::deserialize(&v))
1004 .collect::<Result<Vec<_>, _>>()
1005 .context("invalid output VTXOs from server")
1006 .map_err(HarkForfeitError::Err)?;
1007
1008 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1010 .context("new VTXOs received from server don't match our participation")
1011 .map_err(HarkForfeitError::Err)?;
1012
1013 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1014 .context("error forfeiting hArk VTXOs")
1015 .map_err(HarkForfeitError::SentForfeits)?;
1016
1017 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1018}
1019
1020async fn progress_attempt(
1021 state: &AttemptState,
1022 wallet: &Wallet,
1023 part: &RoundParticipation,
1024 event: &RoundEvent,
1025) -> AttemptProgressResult {
1026 match (state, event) {
1030
1031 (
1032 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1033 RoundEvent::VtxoProposal(e),
1034 ) => {
1035 trace!("Received VtxoProposal: {:#?}", e);
1036 match sign_vtxo_tree(
1037 wallet,
1038 part,
1039 &cosign_keys,
1040 &secret_nonces,
1041 &e.unsigned_round_tx,
1042 &e.vtxos_spec,
1043 &e.cosign_agg_nonces,
1044 ).await {
1045 Ok(()) => {
1046 AttemptProgressResult::Updated {
1047 new_state: AttemptState::AwaitingFinishedRound {
1048 unsigned_round_tx: e.unsigned_round_tx.clone(),
1049 vtxos_spec: e.vtxos_spec.clone(),
1050 unlock_hash: *unlock_hash,
1051 },
1052 }
1053 },
1054 Err(e) => {
1055 trace!("Error signing VTXO tree: {:#}", e);
1056 AttemptProgressResult::Failed(e)
1057 },
1058 }
1059 },
1060
1061 (
1062 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1063 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1064 ) => {
1065 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1066 return AttemptProgressResult::Failed(anyhow!(
1067 "signed funding tx ({}) doesn't match tx received before ({})",
1068 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1069 ));
1070 }
1071
1072 if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1073 warn!("Failed to broadcast signed round tx: {:#}", e);
1074 }
1075
1076 match construct_new_vtxos(
1077 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1078 ).await {
1079 Ok(v) => AttemptProgressResult::Finished {
1080 funding_tx: signed_round_tx.clone(),
1081 vtxos: v,
1082 unlock_hash: *unlock_hash,
1083 },
1084 Err(e) => AttemptProgressResult::Failed(anyhow!(
1085 "failed to construct new VTXOs for round: {:#}", e,
1086 )),
1087 }
1088 },
1089
1090 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1091 AttemptProgressResult::Failed(anyhow!(
1092 "unexpectedly received a finished round while we were in state {}",
1093 state.kind(),
1094 ))
1095 },
1096
1097 (state, _) => {
1098 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1099 AttemptProgressResult::NotUpdated
1100 },
1101 }
1102}
1103
1104async fn sign_vtxo_tree(
1105 wallet: &Wallet,
1106 participation: &RoundParticipation,
1107 cosign_keys: &[Keypair],
1108 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1109 unsigned_round_tx: &Transaction,
1110 vtxo_tree: &VtxoTreeSpec,
1111 cosign_agg_nonces: &[musig::AggregatedNonce],
1112) -> anyhow::Result<()> {
1113 let (srv, _) = wallet.require_server().await.context("server not available")?;
1114
1115 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1116
1117 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1119 for vtxo_req in vtxo_tree.iter_vtxos() {
1120 if let Some(i) = my_vtxos.iter().position(|v| {
1121 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1122 }) {
1123 my_vtxos.swap_remove(i);
1124 }
1125 }
1126 if !my_vtxos.is_empty() {
1127 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1128 }
1129
1130 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1131 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1132 trace!("Sending vtxo signatures to server...");
1133 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1134 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1135 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1136 let part_sigs = unsigned_vtxos.cosign_branch(
1137 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1138 ).context("failed to cosign branch: our request not part of tree")?;
1139
1140 info!("Sending {} partial vtxo cosign signatures for pk {}",
1141 part_sigs.len(), key.public_key(),
1142 );
1143
1144 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1145 pubkey: key.public_key().serialize().to_vec(),
1146 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1147 }).await.context("error sending vtxo signatures")?;
1148
1149 Result::<(), anyhow::Error>::Ok(())
1150 })).await.context("error sending VTXO signatures")?;
1151 trace!("Done sending vtxo signatures to server");
1152
1153 Ok(())
1154}
1155
1156async fn construct_new_vtxos(
1157 participation: &RoundParticipation,
1158 unsigned_round_tx: &Transaction,
1159 vtxo_tree: &VtxoTreeSpec,
1160 vtxo_cosign_sigs: &[schnorr::Signature],
1161) -> anyhow::Result<Vec<Vtxo<Full>>> {
1162 let round_txid = unsigned_round_tx.compute_txid();
1163 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1164 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1165
1166 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1168 bail!("Received incorrect vtxo cosign signatures from server");
1170 }
1171
1172 let signed_vtxos = vtxo_tree
1173 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1174 .into_cached_tree();
1175
1176 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1177 let total_nb_expected_vtxos = expected_vtxos.len();
1178
1179 let mut new_vtxos = vec![];
1180 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1181 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1182 let vtxo = signed_vtxos.build_vtxo(idx);
1183
1184 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1187 .context("constructed invalid vtxo from tree")?;
1188
1189 info!("New VTXO from round: {} ({}, {})",
1190 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1191 );
1192
1193 new_vtxos.push(vtxo);
1194 expected_vtxos.swap_remove(expected_idx);
1195 }
1196 }
1197
1198 if !expected_vtxos.is_empty() {
1199 if expected_vtxos.len() == total_nb_expected_vtxos {
1200 bail!("None of our VTXOs were present in round!");
1202 } else {
1203 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1204 expected_vtxos.len(), expected_vtxos,
1205 );
1206 }
1207 }
1208 Ok(new_vtxos)
1209}
1210
1211async fn persist_round_success(
1213 wallet: &Wallet,
1214 participation: &RoundParticipation,
1215 movement_id: Option<MovementId>,
1216 new_vtxos: &[Vtxo<Full>],
1217 funding_tx: &Transaction,
1218) -> anyhow::Result<()> {
1219 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1220 new_vtxos.len(), movement_id,
1221 );
1222
1223 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1227 .context("failed to store new VTXOs");
1228 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1229 .context("failed to mark input VTXOs as spent");
1230 let update_result = if let Some(mid) = movement_id {
1231 wallet.inner.movements.finish_movement_with_update(
1232 mid,
1233 MovementStatus::Successful,
1234 MovementUpdate::new()
1235 .produced_vtxos(new_vtxos)
1236 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1237 ).await.context("failed to mark movement as finished")
1238 } else {
1239 Ok(())
1240 };
1241
1242 store_result?;
1243 spent_result?;
1244 update_result?;
1245
1246 Ok(())
1247}
1248
1249async fn persist_round_failure(
1250 wallet: &Wallet,
1251 participation: &RoundParticipation,
1252 movement_id: Option<MovementId>,
1253) -> anyhow::Result<()> {
1254 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1255 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1256 let finish_result = if let Some(movement_id) = movement_id {
1257 wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1258 } else {
1259 Ok(())
1260 };
1261 if let Err(e) = &finish_result {
1262 error!("Failed to mark movement as failed: {:#}", e);
1263 }
1264 match (unlock_result, finish_result) {
1265 (Ok(()), Ok(())) => Ok(()),
1266 (Err(e), _) => Err(e),
1267 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1268 }
1269}
1270
1271async fn update_funding_txid(
1272 wallet: &Wallet,
1273 movement_id: MovementId,
1274 funding_txid: Txid,
1275) -> anyhow::Result<()> {
1276 wallet.inner.movements.update_movement(
1277 movement_id,
1278 MovementUpdate::new()
1279 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1280 ).await.context("Unable to update funding txid of round")
1281}
1282
1283impl Wallet {
1284 pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1289 let guard = self.inner.lock_manager.lock(
1290 &format!("{}.round.{}", self.fingerprint(), id),
1291 ROUND_LOCK_TIMEOUT,
1292 ).await.with_context(|| format!(
1293 "timed out waiting for lock on round state {} (wallet {})",
1294 id, self.fingerprint(),
1295 ))?;
1296
1297 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1298 return Ok(Some(state.lock(guard)));
1299 }
1300
1301 Ok(None)
1302 }
1303
1304 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1306 let (mut srv, _) = self.require_server().await?;
1307 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1308 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1309 }
1310
1311 pub async fn join_next_round(
1320 &self,
1321 participation: RoundParticipation,
1322 movement_kind: Option<RoundMovement>,
1323 ) -> anyhow::Result<StoredRoundState> {
1324 let movement = if let Some(kind) = movement_kind {
1325 Some(self.inner.movements.new_guarded_movement_with_update(
1326 Subsystem::ROUND,
1327 kind.to_string(),
1328 OnDropStatus::Failed,
1329 participation.to_movement_update()?
1330 ).await?)
1331 } else {
1332 None
1333 };
1334 let movement_id = movement.as_ref().map(|m| m.id());
1335 let input_vtxos = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1336 let state = RoundState::new_interactive(participation, movement_id);
1337
1338 self.lock_vtxos(&input_vtxos, movement_id.map(|m| m.into())).await
1339 .context("failed to lock input VTXOs")?;
1340
1341 match (async || {
1342 let id = self.inner.db.store_round_state(&state).await?;
1343 Ok(self.lock_wait_round_state(id).await?
1344 .context("failed to lock fresh round state")?)
1345 })().await {
1346 Ok(state) => {
1347 if let Some(mut m) = movement {
1348 m.stop();
1349 }
1350 Ok(state)
1351 },
1352 Err(e) => {
1353 self.unlock_vtxos(&input_vtxos).await
1354 .context("failed to unlock input VTXOs")?;
1355 if let Some(mut m) = movement {
1356 m.fail().await.context("failed to mark movement as failed")?;
1357 }
1358 Err(e)
1359 },
1360 }
1361 }
1362
1363 pub async fn join_next_round_delegated(
1365 &self,
1366 participation: RoundParticipation,
1367 movement_kind: Option<RoundMovement>,
1368 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1369 let movement = if let Some(kind) = movement_kind {
1370 Some(self.inner.movements.new_guarded_movement_with_update(
1371 Subsystem::ROUND,
1372 kind.to_string(),
1373 OnDropStatus::Failed,
1374 participation.to_movement_update()?,
1375 ).await?)
1376 } else {
1377 None
1378 };
1379 let movement_id = movement.as_ref().map(|m| m.id());
1380
1381 let input_ids = participation.inputs.iter().map(|v| v.id()).collect::<Vec<_>>();
1382 self.lock_vtxos(&input_ids, movement_id.map(|m| m.into())).await
1383 .context("error locking input VTXOs")?;
1384
1385 match self.join_next_round_delegated_inner(participation, movement_id).await {
1386 Ok(state) => {
1387 if let Some(mut m) = movement {
1388 m.stop();
1389 }
1390 Ok(state)
1391 },
1392 Err(e) => {
1393 self.unlock_vtxos(&input_ids).await
1394 .context("error unlocking input VTXOs")?;
1395 if let Some(mut m) = movement {
1396 m.fail().await.context("error marking movement as failed")?;
1397 }
1398 Err(e)
1399 },
1400 }
1401 }
1402
1403 async fn join_next_round_delegated_inner(
1404 &self,
1405 participation: RoundParticipation,
1406 movement_id: Option<MovementId>,
1407 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1408 let (mut srv, _) = self.require_server().await?;
1409
1410 let unblinded_mailbox_id = self.mailbox_identifier();
1412
1413 self.register_vtxo_transactions_with_server(&participation.inputs).await
1415 .context("failed to register input vtxo transactions with server")?;
1416
1417 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1419 for vtxo in participation.inputs.iter() {
1420 let keypair = self.get_vtxo_key(vtxo).await
1421 .context("failed to get vtxo keypair")?;
1422 input_vtxos.push(protos::InputVtxo {
1423 vtxo_id: vtxo.id().to_bytes().to_vec(),
1424 attestation: {
1425 let attestation = DelegatedRoundParticipationAttestation::new(
1426 vtxo.id(), &participation.outputs, &keypair,
1427 );
1428 attestation.serialize()
1429 },
1430 });
1431 }
1432
1433 let vtxo_requests = participation.outputs.iter()
1435 .map(|req|
1436 protos::VtxoRequest {
1437 policy: req.policy.serialize(),
1438 amount: req.amount.to_sat(),
1439 })
1440 .collect::<Vec<_>>();
1441
1442 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1444 input_vtxos,
1445 vtxo_requests,
1446 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1447 }).await.context("error submitting round participation to server")?.into_inner();
1448
1449 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1450 .context("invalid unlock hash from server")?;
1451
1452 let state = RoundState::new_delegated(participation, unlock_hash, movement_id);
1453
1454 info!("Delegated round participation submitted, it will automatically execute \
1455 when you next sync your wallet after the round happened \
1456 (and has sufficient confirmations).",
1457 );
1458
1459 let id = self.inner.db.store_round_state(&state).await?;
1460 Ok(StoredRoundState::new(id, state))
1461 }
1462
1463 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1465 self.inner.db.get_pending_round_state_ids().await
1466 }
1467
1468 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1470 let ids = self.inner.db.get_pending_round_state_ids().await?;
1471 let mut states = Vec::with_capacity(ids.len());
1472 for id in ids {
1473 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1474 states.push(state);
1475 }
1476 }
1477 Ok(states)
1478 }
1479
1480 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1482 let mut ret = Amount::ZERO;
1483 for round in self.pending_round_states().await? {
1484 ret += round.state().pending_balance();
1485 }
1486 Ok(ret)
1487 }
1488
1489 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1494 let mut ret = Vec::new();
1495 for round in self.pending_round_states().await? {
1496 let inputs = round.state().locked_pending_inputs();
1497 ret.reserve(inputs.len());
1498 for input in inputs {
1499 let v = self.get_vtxo_by_id(input.id()).await
1500 .context("unknown round input VTXO")?;
1501 ret.push(v);
1502 }
1503 }
1504 Ok(ret)
1505 }
1506
1507 pub async fn sync_pending_rounds(&self) -> anyhow::Result<HashMap<RoundStateId, RoundStatus>> {
1509 let states = self.pending_round_states().await?;
1510 if states.is_empty() {
1511 return Ok(HashMap::new());
1512 }
1513
1514 debug!("Syncing {} pending round states...", states.len());
1515
1516 let ret = Arc::new(parking_lot::Mutex::new(HashMap::with_capacity(states.len())));
1517 tokio_stream::iter(states).for_each_concurrent(10, |state| {
1518 let ret = ret.clone();
1519 async move {
1520 if state.state().ongoing_participation() {
1522 return;
1523 }
1524
1525 let mut state = match self.lock_wait_round_state(state.id()).await {
1526 Ok(Some(state)) => state,
1527 Ok(None) => return,
1528 Err(e) => {
1529 warn!("Error locking round state: {:#}", e);
1530 return;
1531 },
1532 };
1533
1534 let status = match state.state_mut().sync(self).await {
1535 Ok(s) => s,
1536 Err(e) => {
1537 warn!("Error syncing round: {:#}", e);
1538 return;
1539 },
1540 };
1541 trace!("Synced round #{}, status: {:?}", state.id(), status);
1542 match status {
1543 RoundStatus::Confirmed { funding_txid } => {
1544 info!("Round confirmed. Funding tx {}", funding_txid);
1545 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1546 warn!("Error removing confirmed round state from db: {:#}", e);
1547 }
1548 },
1549 RoundStatus::Unconfirmed { funding_txid } => {
1550 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1551 if let Err(e) = self.inner.db.update_round_state(&state).await {
1552 warn!("Error updating pending round state in db: {:#}", e);
1553 }
1554 },
1555 RoundStatus::Pending => {
1556 if let Err(e) = self.inner.db.update_round_state(&state).await {
1557 warn!("Error updating pending round state in db: {:#}", e);
1558 }
1559 },
1560 RoundStatus::Failed { ref error } => {
1561 error!("Round failed: {}", error);
1562 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1563 warn!("Error removing failed round state from db: {:#}", e);
1564 }
1565 },
1566 RoundStatus::Canceled => {
1567 error!("Round canceled");
1568 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1569 warn!("Error removing canceled round state from db: {:#}", e);
1570 }
1571 },
1572 }
1573 ret.lock().insert(state.id(), status);
1574 }
1575 }).await;
1576
1577 Ok(Arc::into_inner(ret).expect("only ref left").into_inner())
1578 }
1579
1580 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1582 let (mut srv, _) = self.require_server().await?;
1583 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1584 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1585 }
1586
1587 async fn inner_process_event(
1588 &self,
1589 state: &mut StoredRoundState,
1590 event: Option<&RoundEvent>,
1591 ) {
1592 if let Some(event) = event && state.state().ongoing_participation() {
1593 let updated = state.state_mut().process_event(self, &event).await;
1594 if updated {
1595 if let Err(e) = self.inner.db.update_round_state(&state).await {
1596 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1597 }
1598 }
1599 }
1600
1601 match state.state_mut().sync(self).await {
1602 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1603 Ok(s) if s.is_final() => {
1604 info!("Round #{} finished with result: {:?}", state.id(), s);
1605 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1606 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1607 }
1608 },
1609 Ok(s) => {
1610 trace!("Round state #{} is now in state {:?}", state.id(), s);
1611 if let Err(e) = self.inner.db.update_round_state(&state).await {
1612 warn!("Error storing round state #{}: {:#}", state.id(), e);
1613 }
1614 },
1615 }
1616 }
1617
1618 pub async fn progress_pending_rounds(
1623 &self,
1624 last_round_event: Option<&RoundEvent>,
1625 ) -> anyhow::Result<()> {
1626 let states = self.pending_round_states().await?;
1627 if states.is_empty() {
1628 return Ok(());
1629 }
1630
1631 info!("Processing {} rounds...", states.len());
1632
1633 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1634
1635 let has_ongoing_participation = states.iter()
1636 .any(|s| s.state().ongoing_participation());
1637 if has_ongoing_participation && last_round_event.is_none() {
1638 match self.get_last_round_event().await {
1639 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1640 Err(e) => {
1641 warn!("Error fetching round event, \
1642 failed to progress ongoing rounds: {:#}", e);
1643 },
1644 }
1645 }
1646
1647 let event = last_round_event.as_ref().map(|c| c.as_ref());
1648
1649 let futs = states.into_iter().map(async |state| {
1650 let locked = self.lock_wait_round_state(state.id()).await?;
1651 if let Some(mut locked) = locked {
1652 self.inner_process_event(&mut locked, event).await;
1653 }
1654 Ok::<_, anyhow::Error>(())
1655 });
1656
1657 futures::future::join_all(futs).await;
1658
1659 Ok(())
1660 }
1661
1662 pub async fn subscribe_round_events(&self)
1663 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1664 {
1665 let (mut srv, _) = self.require_server().await?;
1666 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1667 .into_inner().map(|m| {
1668 let m = m.context("received error on event stream")?;
1669 let e = RoundEvent::try_from(m.clone())
1670 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1671 trace!("Received round event: {}", e);
1672 Ok::<_, anyhow::Error>(e)
1673 });
1674 Ok(events)
1675 }
1676
1677 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1682 let mut events = self.subscribe_round_events().await?;
1683
1684 loop {
1685 let state_ids = self.pending_round_states().await?.iter()
1688 .filter(|s| s.state().ongoing_participation())
1689 .map(|s| s.id())
1690 .collect::<Vec<_>>();
1691
1692 if state_ids.is_empty() {
1693 info!("All rounds handled");
1694 return Ok(());
1695 }
1696
1697 let event = events.next().await
1698 .context("events stream broke")?
1699 .context("error on event stream")?;
1700
1701 let futs = state_ids.into_iter().map(async |state| {
1702 let locked = self.lock_wait_round_state(state).await?;
1703 if let Some(mut locked) = locked {
1704 self.inner_process_event(&mut locked, Some(&event)).await;
1705 }
1706 Ok::<_, anyhow::Error>(())
1707 });
1708
1709 futures::future::join_all(futs).await;
1710 }
1711 }
1712
1713 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1718 let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1720
1721 let futures = state_ids.into_iter().map(|state_id| {
1722 async move {
1723 let mut state = match self.lock_wait_round_state(state_id).await {
1725 Ok(Some(s)) => s,
1726 Ok(None) => return,
1727 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1728 };
1729
1730 match state.state_mut().try_cancel(self).await {
1731 Ok(true) => {
1732 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1733 warn!("Error removing canceled round state from db: {:#}", e);
1734 }
1735 },
1736 Ok(false) => {},
1737 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1738 }
1739 }
1740 });
1741
1742 join_all(futures).await;
1743
1744 Ok(())
1745 }
1746
1747 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1749 let mut state = self.lock_wait_round_state(id).await?
1750 .context("round state not found")?;
1751
1752 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1753 self.inner.db.remove_round_state(&state).await
1754 .context("error removing canceled round state from db")?;
1755 } else {
1756 bail!("failed to cancel round");
1757 }
1758
1759 Ok(())
1760 }
1761
1762 pub(crate) async fn participate_round(
1769 &self,
1770 participation: RoundParticipation,
1771 movement_kind: Option<RoundMovement>,
1772 ) -> anyhow::Result<RoundStatus> {
1773 let mut state = self.join_next_round(participation, movement_kind).await?;
1774
1775 info!("Waiting for a round start...");
1776 let mut events = self.subscribe_round_events().await?;
1777
1778 loop {
1779 if !state.state().ongoing_participation() {
1780 let status = state.state_mut().sync(self).await?;
1781 match status {
1782 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1783 RoundStatus::Canceled => bail!("round canceled"),
1784 status => return Ok(status),
1785 }
1786 }
1787
1788 let event = events.next().await
1789 .context("events stream broke")?
1790 .context("error on event stream")?;
1791 if state.state_mut().process_event(self, &event).await {
1792 self.inner.db.update_round_state(&state).await?;
1793 }
1794 }
1795 }
1796}