1use std::iter;
6use std::borrow::Cow;
7use std::convert::Infallible;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10use anyhow::Context;
11use ark::vtxo::VtxoValidationError;
12use bdk_esplora::esplora_client::Amount;
13use bip39::rand;
14use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
15use bitcoin::consensus::encode::{deserialize, serialize_hex};
16use bitcoin::hashes::Hash;
17use bitcoin::hex::DisplayHex;
18use bitcoin::key::Keypair;
19use bitcoin::secp256k1::schnorr;
20use futures::future::{join_all, try_join_all};
21use futures::{Stream, StreamExt};
22use log::{debug, error, info, trace, warn};
23
24use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
25use ark::vtxo::Full;
26use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
27use ark::forfeit::HashLockedForfeitBundle;
28use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
29use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
30use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
31use bitcoin_ext::TxStatus;
32use server_rpc::{protos, ServerConnection, TryFromBytes};
33
34use crate::{SECP, Wallet, WalletVtxo};
35use crate::movement::{MovementId, MovementStatus};
36use crate::movement::update::MovementUpdate;
37use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
38
39const ROUND_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
42use crate::subsystem::{RoundMovement, Subsystem};
43
44
45const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct RoundParticipation {
51 #[serde(with = "ark::encode::serde::vec")]
52 pub inputs: Vec<Vtxo<Full>>,
53 pub outputs: Vec<VtxoRequest>,
56 #[serde(default, skip_serializing_if = "Option::is_none", with = "ark::encode::serde::opt")]
58 pub unblinded_mailbox_id: Option<ark::mailbox::MailboxIdentifier>,
59}
60
61impl RoundParticipation {
62 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
63 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
64 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
65 let fee = input_amount - output_amount;
66 Ok(MovementUpdate::new()
67 .consumed_vtxos(&self.inputs)
68 .intended_balance(SignedAmount::ZERO)
69 .effective_balance( - fee.to_signed()?)
70 .fee(fee)
71 )
72 }
73}
74
75#[derive(Debug, Clone)]
76pub enum RoundStatus {
77 Confirmed {
79 funding_txid: Txid,
80 },
81 Unconfirmed {
83 funding_txid: Txid,
84 },
85 Pending,
87 Failed {
89 error: String,
90 },
91 Canceled,
93}
94
95impl RoundStatus {
96 pub fn is_final(&self) -> bool {
98 match self {
99 Self::Confirmed { .. } => true,
100 Self::Unconfirmed { .. } => false,
101 Self::Pending => false,
102 Self::Failed { .. } => true,
103 Self::Canceled => true,
104 }
105 }
106
107 pub fn is_success(&self) -> bool {
109 match self {
110 Self::Confirmed { .. } => true,
111 Self::Unconfirmed { .. } => true,
112 Self::Pending => false,
113 Self::Failed { .. } => false,
114 Self::Canceled => false,
115 }
116 }
117}
118
119pub struct RoundState {
132 pub(crate) done: bool,
134
135 pub(crate) participation: RoundParticipation,
137
138 pub(crate) flow: RoundFlowState,
140
141 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
146
147 pub(crate) sent_forfeit_sigs: bool,
154
155 pub(crate) movement_id: Option<MovementId>,
157}
158
159impl RoundState {
160 fn new_interactive(
161 participation: RoundParticipation,
162 movement_id: Option<MovementId>,
163 ) -> Self {
164 Self {
165 participation,
166 movement_id,
167 flow: RoundFlowState::InteractivePending,
168 new_vtxos: Vec::new(),
169 sent_forfeit_sigs: false,
170 done: false,
171 }
172 }
173
174 fn new_non_interactive(
175 participation: RoundParticipation,
176 unlock_hash: UnlockHash,
177 movement_id: Option<MovementId>,
178 ) -> Self {
179 Self {
180 participation,
181 movement_id,
182 flow: RoundFlowState::NonInteractivePending { unlock_hash },
183 new_vtxos: Vec::new(),
184 sent_forfeit_sigs: false,
185 done: false,
186 }
187 }
188
189 pub fn participation(&self) -> &RoundParticipation {
191 &self.participation
192 }
193
194 pub fn unlock_hash(&self) -> Option<UnlockHash> {
196 match self.flow {
197 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
198 RoundFlowState::InteractivePending => None,
199 RoundFlowState::InteractiveOngoing { .. } => None,
200 RoundFlowState::Failed { .. } => None,
201 RoundFlowState::Canceled => None,
202 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
203 }
204 }
205
206 pub fn funding_tx(&self) -> Option<&Transaction> {
207 match self.flow {
208 RoundFlowState::NonInteractivePending { .. } => None,
209 RoundFlowState::InteractivePending => None,
210 RoundFlowState::InteractiveOngoing { .. } => None,
211 RoundFlowState::Failed { .. } => None,
212 RoundFlowState::Canceled => None,
213 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
214 }
215 }
216
217 pub fn ongoing_participation(&self) -> bool {
219 match self.flow {
220 RoundFlowState::NonInteractivePending { .. } => false,
221 RoundFlowState::InteractivePending => true,
222 RoundFlowState::InteractiveOngoing { .. } => true,
223 RoundFlowState::Failed { .. } => false,
224 RoundFlowState::Canceled => false,
225 RoundFlowState::Finished { .. } => false,
226 }
227 }
228
229 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
232 let ret = match self.flow {
233 RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
234 RoundFlowState::Canceled => true,
235 RoundFlowState::Failed { .. } => true,
236 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
237 self.flow = RoundFlowState::Canceled;
238 true
239 },
240 RoundFlowState::Finished { .. } => false,
241 };
242 if ret {
243 persist_round_failure(wallet, &self.participation, self.movement_id).await
244 .context("failed to persist round failure for cancelation")?;
245 }
246 Ok(ret)
247 }
248
249 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
250 match start_attempt(wallet, &self.participation, attempt).await {
251 Ok(state) => {
252 self.flow = RoundFlowState::InteractiveOngoing {
253 round_seq: attempt.round_seq,
254 attempt_seq: attempt.attempt_seq,
255 state: state,
256 };
257 },
258 Err(e) => {
259 self.flow = RoundFlowState::Failed {
260 error: format!("{:#}", e),
261 };
262 },
263 }
264 }
265
266 pub async fn process_event(
268 &mut self,
269 wallet: &Wallet,
270 event: &RoundEvent,
271 ) -> bool {
272 let _: Infallible = match self.flow {
273 RoundFlowState::InteractivePending => {
274 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
275 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
276 self.try_start_attempt(wallet, e).await;
277 return true;
278 } else {
279 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
280 event.kind(), event.round_seq(), event.attempt_seq(),
281 );
282 return false;
283 }
284 },
285 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
286 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
289 warn!("Round {} failed by server", round_seq);
290 self.flow = RoundFlowState::Failed {
291 error: format!("round {} failed by server", round_seq),
292 };
293 return true;
294 }
295
296 if event.round_seq() > round_seq {
297 self.flow = RoundFlowState::Failed {
300 error: format!("round {} started while we were on {}",
301 event.round_seq(), round_seq,
302 ),
303 };
304 return true;
305 }
306
307 if event.attempt_seq() < attempt_seq {
308 trace!("ignoring replayed message from old attempt");
309 return false;
310 }
311
312 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
313 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
314 self.try_start_attempt(wallet, e).await;
315 return true;
316 }
317 trace!("Processing event {} for round attempt {}:{} in state {}",
318 event.kind(), round_seq, attempt_seq, state.kind(),
319 );
320
321 return match progress_attempt(state, wallet, &self.participation, event).await {
322 AttemptProgressResult::NotUpdated => false,
323 AttemptProgressResult::Updated { new_state } => {
324 *state = new_state;
325 true
326 },
327 AttemptProgressResult::Failed(e) => {
328 warn!("Round failed with error: {:#}", e);
329 self.flow = RoundFlowState::Failed {
330 error: format!("{:#}", e),
331 };
332 true
333 },
334 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
335 self.new_vtxos = vtxos;
336 let funding_txid = funding_tx.compute_txid();
337 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
338 if let Some(mid) = self.movement_id {
339 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
340 warn!("Error updating the round funding txid: {:#}", e);
341 }
342 }
343 true
344 },
345 };
346 },
347 RoundFlowState::NonInteractivePending { .. }
348 | RoundFlowState::Finished { .. }
349 | RoundFlowState::Failed { .. }
350 | RoundFlowState::Canceled => return false,
351 };
352 }
353
354 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
359 match self.flow {
360 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
361 Ok(RoundStatus::Confirmed {
362 funding_txid: funding_tx.compute_txid(),
363 })
364 },
365
366 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
367 Ok(RoundStatus::Pending)
368 },
369 RoundFlowState::Failed { ref error } => {
370 persist_round_failure(wallet, &self.participation, self.movement_id).await
371 .context("failed to persist round failure")?;
372 Ok(RoundStatus::Failed { error: error.clone() })
373 },
374 RoundFlowState::Canceled => {
375 persist_round_failure(wallet, &self.participation, self.movement_id).await
376 .context("failed to persist round failure")?;
377 Ok(RoundStatus::Canceled)
378 },
379
380 RoundFlowState::NonInteractivePending { unlock_hash } => {
381 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
382 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
383 Ok(HarkProgressResult::RoundNotFound) => {
384 self.handle_round_not_found(wallet).await
385 },
386 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
387 let funding_txid = funding_tx.compute_txid();
388 self.new_vtxos = new_vtxos;
389 self.flow = RoundFlowState::Finished {
390 funding_tx: funding_tx.clone(),
391 unlock_hash: unlock_hash,
392 };
393
394 persist_round_success(
395 wallet,
396 &self.participation,
397 self.movement_id,
398 &self.new_vtxos,
399 &funding_tx,
400 ).await.context("failed to store successful round in DB!")?;
401
402 self.done = true;
403
404 Ok(RoundStatus::Confirmed { funding_txid })
405 },
406 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
407 if let Some(mid) = self.movement_id {
408 update_funding_txid(wallet, mid, funding_txid).await
409 .context("failed to update funding txid in DB")?;
410 }
411 Ok(RoundStatus::Unconfirmed { funding_txid })
412 },
413
414 Err(HarkForfeitError::Err(e)) => {
417 Err(e.context("error progressing non-interactive round"))
421 },
422 Err(HarkForfeitError::SentForfeits(e)) => {
423 self.sent_forfeit_sigs = true;
424 Err(e.context("error progressing non-interactive round \
425 after sending forfeit tx signatures"))
426 },
427 }
428 },
429 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
431 let funding_txid = funding_tx.compute_txid();
432 let confirmed = check_funding_tx_confirmations(
433 wallet, funding_txid, &funding_tx,
434 ).await.context("error checking funding tx confirmations")?;
435 if !confirmed {
436 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
437 return Ok(RoundStatus::Unconfirmed { funding_txid });
438 }
439
440 match hark_vtxo_swap(
441 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
442 ).await {
443 Ok(()) => {
444 persist_round_success(
445 wallet,
446 &self.participation,
447 self.movement_id,
448 &self.new_vtxos,
449 &funding_tx,
450 ).await.context("failed to store successful round in DB!")?;
451
452 self.done = true;
453
454 Ok(RoundStatus::Confirmed { funding_txid })
455 },
456 Err(HarkForfeitError::Err(e)) => {
457 Err(e.context("error forfeiting VTXOs after round"))
458 },
459 Err(HarkForfeitError::SentForfeits(e)) => {
460 self.sent_forfeit_sigs = true;
461 Err(e.context("error after having signed and sent \
462 forfeit signatures to server"))
463 },
464 }
465 },
466 }
467 }
468
469 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
472 if self.new_vtxos.is_empty() {
473 None
474 } else {
475 Some(&self.new_vtxos)
476 }
477 }
478
479 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
482 match self.flow {
484 RoundFlowState::NonInteractivePending { .. }
485 | RoundFlowState::InteractivePending
486 | RoundFlowState::InteractiveOngoing { .. }
487 => {
488 &self.participation.inputs
489 },
490 RoundFlowState::Finished { .. } => if self.done {
491 &[]
493 } else {
494 &self.participation.inputs
495 },
496 RoundFlowState::Failed { .. }
497 | RoundFlowState::Canceled
498 => {
499 &[]
501 },
502 }
503 }
504
505 pub fn pending_balance(&self) -> Amount {
509 if self.done {
510 return Amount::ZERO;
511 }
512
513 match self.flow {
514 RoundFlowState::NonInteractivePending { .. }
515 | RoundFlowState::InteractivePending
516 | RoundFlowState::InteractiveOngoing { .. }
517 | RoundFlowState::Finished { .. }
518 => {
519 self.participation.outputs.iter().map(|o| o.amount).sum()
520 },
521 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
522 Amount::ZERO
523 },
524 }
525 }
526
527 async fn handle_round_not_found(
534 &mut self,
535 wallet: &Wallet,
536 ) -> anyhow::Result<RoundStatus> {
537 info!("Server reports round participation not found (no forfeits sent)");
538 self.flow = RoundFlowState::Failed {
539 error: "server reports round participation not found".into(),
540 };
541 persist_round_failure(wallet, &self.participation, self.movement_id).await
542 .context("failed to persist round failure")?;
543
544 Ok(RoundStatus::Failed {
545 error: "server reports round participation not found".into(),
546 })
547 }
548}
549
550pub enum RoundFlowState {
555 NonInteractivePending {
557 unlock_hash: UnlockHash,
558 },
559
560 InteractivePending,
562 InteractiveOngoing {
564 round_seq: RoundSeq,
565 attempt_seq: usize,
566 state: AttemptState,
567 },
568
569 Finished {
571 funding_tx: Transaction,
572 unlock_hash: UnlockHash,
573 },
574
575 Failed {
577 error: String,
578 },
579
580 Canceled,
582}
583
584pub enum AttemptState {
589 AwaitingAttempt,
590 AwaitingUnsignedVtxoTree {
591 cosign_keys: Vec<Keypair>,
592 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
593 unlock_hash: UnlockHash,
594 },
595 AwaitingFinishedRound {
596 unsigned_round_tx: Transaction,
597 vtxos_spec: VtxoTreeSpec,
598 unlock_hash: UnlockHash,
599 },
600}
601
602impl AttemptState {
603 fn kind(&self) -> &'static str {
605 match self {
606 Self::AwaitingAttempt => "AwaitingAttempt",
607 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
608 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
609 }
610 }
611}
612
613enum AttemptProgressResult {
615 Finished {
616 funding_tx: Transaction,
617 vtxos: Vec<Vtxo<Full>>,
618 unlock_hash: UnlockHash,
619 },
620 Failed(anyhow::Error),
621 Updated {
627 new_state: AttemptState,
628 },
629 NotUpdated,
630}
631
632async fn start_attempt(
634 wallet: &Wallet,
635 participation: &RoundParticipation,
636 event: &RoundAttempt,
637) -> anyhow::Result<AttemptState> {
638 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
639
640 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
642 .take(participation.outputs.len())
643 .collect::<Vec<_>>();
644
645 let cosign_nonces = cosign_keys.iter()
648 .map(|key| {
649 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
650 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
651 for _ in 0..ark_info.nb_round_nonces {
652 let (s, p) = musig::nonce_pair(key);
653 secs.push(s);
654 pubs.push(p);
655 }
656 (secs, pubs)
657 })
658 .take(participation.outputs.len())
659 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
660
661
662 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
664 participation.inputs.len(), participation.outputs.len(),
665 );
666
667 let unblinded_mailbox_id = wallet.mailbox_identifier();
669 let signed_reqs = participation.outputs.iter()
670 .zip(cosign_keys.iter())
671 .zip(cosign_nonces.iter())
672 .map(|((req, cosign_key), (_sec, pub_nonces))| {
673 SignedVtxoRequest {
674 vtxo: req.clone(),
675 cosign_pubkey: cosign_key.public_key(),
676 nonces: pub_nonces.clone(),
677 }
678 })
679 .collect::<Vec<_>>();
680
681 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
682 for vtxo in participation.inputs.iter() {
683 let keypair = wallet.get_vtxo_key(vtxo).await
684 .map_err(HarkForfeitError::Err)?;
685 input_vtxos.push(protos::InputVtxo {
686 vtxo_id: vtxo.id().to_bytes().to_vec(),
687 attestation: {
688 let attestation = RoundAttemptAttestation::new(
689 event.challenge, vtxo.id(), &signed_reqs, &keypair,
690 );
691 attestation.serialize()
692 },
693 });
694 }
695
696 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
698 .map_err(HarkForfeitError::Err)?;
699
700 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
701 input_vtxos: input_vtxos,
702 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
703 #[allow(deprecated)]
704 offboard_requests: vec![],
705 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
706 }).await.context("Ark server refused our payment submission")?;
707
708 Ok(AttemptState::AwaitingUnsignedVtxoTree {
709 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
710 cosign_keys: cosign_keys,
711 secret_nonces: cosign_nonces.into_iter()
712 .map(|(sec, _pub)| sec.into_iter()
713 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
714 .collect())
715 .collect(),
716 })
717}
718
719#[derive(Debug, thiserror::Error)]
721enum HarkForfeitError {
722 #[error("error after forfeits were sent")]
724 SentForfeits(#[source] anyhow::Error),
725 #[error("error before forfeits were sent")]
727 Err(#[source] anyhow::Error),
728}
729
730async fn hark_cosign_leaf(
731 wallet: &Wallet,
732 srv: &mut ServerConnection,
733 funding_tx: &Transaction,
734 vtxo: &mut Vtxo<Full>,
735) -> anyhow::Result<()> {
736 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
737 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
738 .with_context(|| format!(
739 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
740 ))?.1;
741 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
742 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
743 protos::LeafVtxoCosignRequest::from(cosign_req),
744 ).await
745 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
746 .into_inner().try_into()
747 .context("bad leaf vtxo cosign response")?;
748 ensure!(ctx.finalize(vtxo, cosign_resp),
749 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
750 );
751
752 Ok(())
753}
754
755async fn hark_vtxo_swap(
765 wallet: &Wallet,
766 participation: &RoundParticipation,
767 output_vtxos: &mut [Vtxo<Full>],
768 funding_tx: &Transaction,
769 unlock_hash: UnlockHash,
770) -> Result<(), HarkForfeitError> {
771 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
772
773 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
775 .context("couldn't send our input vtxo transactions to server")
776 .map_err(HarkForfeitError::Err)?;
777
778 for vtxo in output_vtxos.iter_mut() {
780 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
781 .map_err(HarkForfeitError::Err)?;
782 }
783
784 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
787 unlock_hash: unlock_hash.to_byte_array().to_vec(),
788 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
789 }).await
790 .context("request forfeits nonces call failed")
791 .map_err(HarkForfeitError::Err)?
792 .into_inner().public_nonces.into_iter()
793 .map(|b| musig::PublicNonce::from_bytes(b))
794 .collect::<Result<Vec<_>, _>>()
795 .context("invalid forfeit nonces")
796 .map_err(HarkForfeitError::Err)?;
797
798 if server_nonces.len() != participation.inputs.len() {
799 return Err(HarkForfeitError::Err(anyhow!(
800 "server sent {} nonce pairs, expected {}",
801 server_nonces.len(), participation.inputs.len(),
802 )));
803 }
804
805 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
806 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
807 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
808 .ok().flatten().with_context(|| format!(
809 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
810 )).map_err(HarkForfeitError::Err)?.1;
811 forfeit_bundles.push(HashLockedForfeitBundle::new(
812 input, unlock_hash, &user_key, &nonces,
813 ))
814 }
815
816 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
817 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
818 }).await
819 .context("forfeit vtxos call failed")
820 .map_err(HarkForfeitError::SentForfeits)?
821 .into_inner().unlock_preimage.as_slice().try_into()
822 .context("invalid preimage length")
823 .map_err(HarkForfeitError::SentForfeits)?;
824
825 for vtxo in output_vtxos.iter_mut() {
826 if !vtxo.provide_unlock_preimage(preimage) {
827 return Err(HarkForfeitError::SentForfeits(anyhow!(
828 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
829 preimage.as_hex(), vtxo.id(), unlock_hash,
830 )));
831 }
832
833 vtxo.validate(&funding_tx).with_context(|| format!(
835 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
836 )).map_err(HarkForfeitError::SentForfeits)?;
837 }
838
839 Ok(())
840}
841
842fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
843 match vtxo.validate(funding_tx) {
844 Err(VtxoValidationError::GenesisTransition {
845 genesis_idx, genesis_len, transition_kind, ..
846 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
847 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
848 vtxo.serialize_hex(),
849 )),
850 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
851 }
852}
853
854fn check_round_matches_participation(
855 part: &RoundParticipation,
856 new_vtxos: &[Vtxo<Full>],
857 funding_tx: &Transaction,
858) -> anyhow::Result<()> {
859 ensure!(new_vtxos.len() == part.outputs.len(),
860 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
861 );
862
863 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
864 ensure!(vtxo.amount() == req.amount,
865 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
866 );
867 ensure!(*vtxo.policy() == req.policy,
868 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
869 );
870
871 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
873 }
874
875 Ok(())
876}
877
878async fn check_funding_tx_confirmations(
888 wallet: &Wallet,
889 funding_txid: Txid,
890 funding_tx: &Transaction,
891) -> anyhow::Result<bool> {
892 let tip = wallet.inner.chain.tip().await.context("chain source error")?;
893 let conf_height = tip - wallet.inner.config.round_tx_required_confirmations + 1;
894 let tx_status = wallet.inner.chain.tx_status(funding_txid).await.context("chain source error")?;
895 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
896 funding_txid, tx_status, tip,
897 );
898 match tx_status {
899 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
900 TxStatus::Mempool | TxStatus::Confirmed(_) => {
901 if wallet.inner.config.round_tx_required_confirmations == 0 {
902 debug!("Accepting round funding tx without confirmations because of configuration");
903 Ok(true)
904 } else {
905 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
906 Ok(false)
907 }
908 },
909 TxStatus::NotFound => {
910 if let Err(e) = wallet.inner.chain.broadcast_tx(&funding_tx).await {
915 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
916 funding_txid, serialize_hex(funding_tx), e,
917 ))
918 } else {
919 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
920 Ok(false)
921 }
922 },
923 }
924}
925
926enum HarkProgressResult {
927 RoundPending,
928 RoundNotFound,
929 FundingTxUnconfirmed {
930 funding_txid: Txid,
931 },
932 Ok {
933 funding_tx: Transaction,
934 new_vtxos: Vec<Vtxo<Full>>,
935 },
936}
937
938async fn progress_non_interactive(
939 wallet: &Wallet,
940 participation: &RoundParticipation,
941 unlock_hash: UnlockHash,
942) -> Result<HarkProgressResult, HarkForfeitError> {
943 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
944
945 let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
946 unlock_hash: unlock_hash.to_byte_array().to_vec(),
947 }).await {
948 Ok(resp) => resp.into_inner(),
949 Err(err) if err.code() == tonic::Code::NotFound => {
950 return Ok(HarkProgressResult::RoundNotFound);
951 },
952 Err(err) => {
953 return Err(HarkForfeitError::Err(
954 anyhow::Error::from(err).context("error checking round participation status"),
955 ));
956 },
957 };
958 let status = protos::RoundParticipationStatus::try_from(resp.status)
959 .context("unknown status from server")
960 .map_err(HarkForfeitError::Err) ?;
961
962 if status == protos::RoundParticipationStatus::RoundPartPending {
963 trace!("Hark round still pending");
964 return Ok(HarkProgressResult::RoundPending);
965 }
966
967 if status == protos::RoundParticipationStatus::RoundPartReleased {
972 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
973 warn!("Server says preimage was already released for hArk participation \
974 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
975 );
976 }
977
978 let funding_tx_bytes = resp.round_funding_tx
979 .context("funding txid should be provided when status is not pending")
980 .map_err(HarkForfeitError::Err)?;
981 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
982 .context("invalid funding txid")
983 .map_err(HarkForfeitError::Err)?;
984 let funding_txid = funding_tx.compute_txid();
985 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
986 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
987 );
988
989 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
991 Ok(true) => {},
992 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
993 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
994 }
995
996 let mut new_vtxos = resp.output_vtxos.into_iter()
997 .map(|v| <Vtxo<Full>>::deserialize(&v))
998 .collect::<Result<Vec<_>, _>>()
999 .context("invalid output VTXOs from server")
1000 .map_err(HarkForfeitError::Err)?;
1001
1002 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1004 .context("new VTXOs received from server don't match our participation")
1005 .map_err(HarkForfeitError::Err)?;
1006
1007 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1008 .context("error forfeiting hArk VTXOs")
1009 .map_err(HarkForfeitError::SentForfeits)?;
1010
1011 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1012}
1013
1014async fn progress_attempt(
1015 state: &AttemptState,
1016 wallet: &Wallet,
1017 part: &RoundParticipation,
1018 event: &RoundEvent,
1019) -> AttemptProgressResult {
1020 match (state, event) {
1024
1025 (
1026 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1027 RoundEvent::VtxoProposal(e),
1028 ) => {
1029 trace!("Received VtxoProposal: {:#?}", e);
1030 match sign_vtxo_tree(
1031 wallet,
1032 part,
1033 &cosign_keys,
1034 &secret_nonces,
1035 &e.unsigned_round_tx,
1036 &e.vtxos_spec,
1037 &e.cosign_agg_nonces,
1038 ).await {
1039 Ok(()) => {
1040 AttemptProgressResult::Updated {
1041 new_state: AttemptState::AwaitingFinishedRound {
1042 unsigned_round_tx: e.unsigned_round_tx.clone(),
1043 vtxos_spec: e.vtxos_spec.clone(),
1044 unlock_hash: *unlock_hash,
1045 },
1046 }
1047 },
1048 Err(e) => {
1049 trace!("Error signing VTXO tree: {:#}", e);
1050 AttemptProgressResult::Failed(e)
1051 },
1052 }
1053 },
1054
1055 (
1056 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1057 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1058 ) => {
1059 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1060 return AttemptProgressResult::Failed(anyhow!(
1061 "signed funding tx ({}) doesn't match tx received before ({})",
1062 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1063 ));
1064 }
1065
1066 if let Err(e) = wallet.inner.chain.broadcast_tx(&signed_round_tx).await {
1067 warn!("Failed to broadcast signed round tx: {:#}", e);
1068 }
1069
1070 match construct_new_vtxos(
1071 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1072 ).await {
1073 Ok(v) => AttemptProgressResult::Finished {
1074 funding_tx: signed_round_tx.clone(),
1075 vtxos: v,
1076 unlock_hash: *unlock_hash,
1077 },
1078 Err(e) => AttemptProgressResult::Failed(anyhow!(
1079 "failed to construct new VTXOs for round: {:#}", e,
1080 )),
1081 }
1082 },
1083
1084 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1085 AttemptProgressResult::Failed(anyhow!(
1086 "unexpectedly received a finished round while we were in state {}",
1087 state.kind(),
1088 ))
1089 },
1090
1091 (state, _) => {
1092 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1093 AttemptProgressResult::NotUpdated
1094 },
1095 }
1096}
1097
1098async fn sign_vtxo_tree(
1099 wallet: &Wallet,
1100 participation: &RoundParticipation,
1101 cosign_keys: &[Keypair],
1102 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1103 unsigned_round_tx: &Transaction,
1104 vtxo_tree: &VtxoTreeSpec,
1105 cosign_agg_nonces: &[musig::AggregatedNonce],
1106) -> anyhow::Result<()> {
1107 let (srv, _) = wallet.require_server().await.context("server not available")?;
1108
1109 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1110
1111 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1113 for vtxo_req in vtxo_tree.iter_vtxos() {
1114 if let Some(i) = my_vtxos.iter().position(|v| {
1115 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1116 }) {
1117 my_vtxos.swap_remove(i);
1118 }
1119 }
1120 if !my_vtxos.is_empty() {
1121 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1122 }
1123
1124 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1125 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1126 trace!("Sending vtxo signatures to server...");
1127 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1128 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1129 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1130 let part_sigs = unsigned_vtxos.cosign_branch(
1131 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1132 ).context("failed to cosign branch: our request not part of tree")?;
1133
1134 info!("Sending {} partial vtxo cosign signatures for pk {}",
1135 part_sigs.len(), key.public_key(),
1136 );
1137
1138 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1139 pubkey: key.public_key().serialize().to_vec(),
1140 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1141 }).await.context("error sending vtxo signatures")?;
1142
1143 Result::<(), anyhow::Error>::Ok(())
1144 })).await.context("error sending VTXO signatures")?;
1145 trace!("Done sending vtxo signatures to server");
1146
1147 Ok(())
1148}
1149
1150async fn construct_new_vtxos(
1151 participation: &RoundParticipation,
1152 unsigned_round_tx: &Transaction,
1153 vtxo_tree: &VtxoTreeSpec,
1154 vtxo_cosign_sigs: &[schnorr::Signature],
1155) -> anyhow::Result<Vec<Vtxo<Full>>> {
1156 let round_txid = unsigned_round_tx.compute_txid();
1157 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1158 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1159
1160 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1162 bail!("Received incorrect vtxo cosign signatures from server");
1164 }
1165
1166 let signed_vtxos = vtxo_tree
1167 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1168 .into_cached_tree();
1169
1170 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1171 let total_nb_expected_vtxos = expected_vtxos.len();
1172
1173 let mut new_vtxos = vec![];
1174 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1175 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1176 let vtxo = signed_vtxos.build_vtxo(idx);
1177
1178 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1181 .context("constructed invalid vtxo from tree")?;
1182
1183 info!("New VTXO from round: {} ({}, {})",
1184 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1185 );
1186
1187 new_vtxos.push(vtxo);
1188 expected_vtxos.swap_remove(expected_idx);
1189 }
1190 }
1191
1192 if !expected_vtxos.is_empty() {
1193 if expected_vtxos.len() == total_nb_expected_vtxos {
1194 bail!("None of our VTXOs were present in round!");
1196 } else {
1197 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1198 expected_vtxos.len(), expected_vtxos,
1199 );
1200 }
1201 }
1202 Ok(new_vtxos)
1203}
1204
1205async fn persist_round_success(
1207 wallet: &Wallet,
1208 participation: &RoundParticipation,
1209 movement_id: Option<MovementId>,
1210 new_vtxos: &[Vtxo<Full>],
1211 funding_tx: &Transaction,
1212) -> anyhow::Result<()> {
1213 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1214 new_vtxos.len(), movement_id,
1215 );
1216
1217 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1221 .context("failed to store new VTXOs");
1222 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1223 .context("failed to mark input VTXOs as spent");
1224 let update_result = if let Some(mid) = movement_id {
1225 wallet.inner.movements.finish_movement_with_update(
1226 mid,
1227 MovementStatus::Successful,
1228 MovementUpdate::new()
1229 .produced_vtxos(new_vtxos)
1230 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1231 ).await.context("failed to mark movement as finished")
1232 } else {
1233 Ok(())
1234 };
1235
1236 store_result?;
1237 spent_result?;
1238 update_result?;
1239
1240 Ok(())
1241}
1242
1243async fn persist_round_failure(
1244 wallet: &Wallet,
1245 participation: &RoundParticipation,
1246 movement_id: Option<MovementId>,
1247) -> anyhow::Result<()> {
1248 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1249 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1250 let finish_result = if let Some(movement_id) = movement_id {
1251 wallet.inner.movements.finish_movement(movement_id, MovementStatus::Failed).await
1252 } else {
1253 Ok(())
1254 };
1255 if let Err(e) = &finish_result {
1256 error!("Failed to mark movement as failed: {:#}", e);
1257 }
1258 match (unlock_result, finish_result) {
1259 (Ok(()), Ok(())) => Ok(()),
1260 (Err(e), _) => Err(e),
1261 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1262 }
1263}
1264
1265async fn update_funding_txid(
1266 wallet: &Wallet,
1267 movement_id: MovementId,
1268 funding_txid: Txid,
1269) -> anyhow::Result<()> {
1270 wallet.inner.movements.update_movement(
1271 movement_id,
1272 MovementUpdate::new()
1273 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1274 ).await.context("Unable to update funding txid of round")
1275}
1276
1277impl Wallet {
1278 pub async fn lock_wait_round_state(&self, id: RoundStateId) -> anyhow::Result<Option<StoredRoundState>> {
1283 let guard = self.inner.lock_manager.lock(
1284 &format!("{}.round.{}", self.fingerprint(), id),
1285 ROUND_LOCK_TIMEOUT,
1286 ).await.with_context(|| format!(
1287 "timed out waiting for lock on round state {} (wallet {})",
1288 id, self.fingerprint(),
1289 ))?;
1290
1291 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1292 return Ok(Some(state.lock(guard)));
1293 }
1294
1295 Ok(None)
1296 }
1297
1298 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1300 let (mut srv, _) = self.require_server().await?;
1301 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1302 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1303 }
1304
1305 pub async fn join_next_round(
1314 &self,
1315 participation: RoundParticipation,
1316 movement_kind: Option<RoundMovement>,
1317 ) -> anyhow::Result<StoredRoundState> {
1318 let movement_id = if let Some(kind) = movement_kind {
1319 Some(self.inner.movements.new_movement_with_update(
1320 Subsystem::ROUND,
1321 kind.to_string(),
1322 participation.to_movement_update()?
1323 ).await?)
1324 } else {
1325 None
1326 };
1327 let state = RoundState::new_interactive(participation, movement_id);
1328
1329 let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1330 let state = self.lock_wait_round_state(id).await?
1331 .context("failed to lock fresh round state")?;
1332
1333 Ok(state)
1334 }
1335
1336 pub async fn join_next_round_delegated(
1338 &self,
1339 participation: RoundParticipation,
1340 movement_kind: Option<RoundMovement>,
1341 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1342 let (mut srv, _) = self.require_server().await?;
1343
1344 let movement_id = if let Some(kind) = movement_kind {
1345 Some(self.inner.movements.new_movement_with_update(
1346 Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1347 ).await?)
1348 } else {
1349 None
1350 };
1351
1352 let unblinded_mailbox_id = self.mailbox_identifier();
1354
1355 self.register_vtxo_transactions_with_server(&participation.inputs).await
1357 .context("failed to register input vtxo transactions with server")?;
1358
1359 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1361 for vtxo in participation.inputs.iter() {
1362 let keypair = self.get_vtxo_key(vtxo).await
1363 .context("failed to get vtxo keypair")?;
1364 input_vtxos.push(protos::InputVtxo {
1365 vtxo_id: vtxo.id().to_bytes().to_vec(),
1366 attestation: {
1367 let attestation = DelegatedRoundParticipationAttestation::new(
1368 vtxo.id(), &participation.outputs, &keypair,
1369 );
1370 attestation.serialize()
1371 },
1372 });
1373 }
1374
1375 let vtxo_requests = participation.outputs.iter()
1377 .map(|req|
1378 protos::VtxoRequest {
1379 policy: req.policy.serialize(),
1380 amount: req.amount.to_sat(),
1381 })
1382 .collect::<Vec<_>>();
1383
1384 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1386 input_vtxos,
1387 vtxo_requests,
1388 unblinded_mailbox_id: Some(unblinded_mailbox_id.serialize()),
1389 }).await.context("error submitting round participation to server")?.into_inner();
1390
1391 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1392 .context("invalid unlock hash from server")?;
1393
1394 let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1395
1396 info!("Non-interactive round participation submitted, it will automatically execute \
1397 when you next sync your wallet after the round happened \
1398 (and has sufficient confirmations).",
1399 );
1400
1401 let id = self.inner.db.store_round_state_lock_vtxos(&state).await?;
1402 Ok(StoredRoundState::new(id, state))
1403 }
1404
1405 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1407 self.inner.db.get_pending_round_state_ids().await
1408 }
1409
1410 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1412 let ids = self.inner.db.get_pending_round_state_ids().await?;
1413 let mut states = Vec::with_capacity(ids.len());
1414 for id in ids {
1415 if let Some(state) = self.inner.db.get_round_state_by_id(id).await? {
1416 states.push(state);
1417 }
1418 }
1419 Ok(states)
1420 }
1421
1422 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1424 let mut ret = Amount::ZERO;
1425 for round in self.pending_round_states().await? {
1426 ret += round.state().pending_balance();
1427 }
1428 Ok(ret)
1429 }
1430
1431 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1436 let mut ret = Vec::new();
1437 for round in self.pending_round_states().await? {
1438 let inputs = round.state().locked_pending_inputs();
1439 ret.reserve(inputs.len());
1440 for input in inputs {
1441 let v = self.get_vtxo_by_id(input.id()).await
1442 .context("unknown round input VTXO")?;
1443 ret.push(v);
1444 }
1445 }
1446 Ok(ret)
1447 }
1448
1449 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1451 let states = self.pending_round_states().await?;
1452 if states.is_empty() {
1453 return Ok(());
1454 }
1455
1456 debug!("Syncing {} pending round states...", states.len());
1457
1458 tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1459 if state.state().ongoing_participation() {
1461 return;
1462 }
1463
1464 let mut state = match self.lock_wait_round_state(state.id()).await {
1465 Ok(Some(state)) => state,
1466 Ok(None) => return,
1467 Err(e) => {
1468 warn!("Error locking round state: {:#}", e);
1469 return;
1470 },
1471 };
1472
1473 let status = state.state_mut().sync(self).await;
1474 trace!("Synced round #{}, status: {:?}", state.id(), status);
1475 match status {
1476 Ok(RoundStatus::Confirmed { funding_txid }) => {
1477 info!("Round confirmed. Funding tx {}", funding_txid);
1478 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1479 warn!("Error removing confirmed round state from db: {:#}", e);
1480 }
1481 },
1482 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1483 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1484 if let Err(e) = self.inner.db.update_round_state(&state).await {
1485 warn!("Error updating pending round state in db: {:#}", e);
1486 }
1487 },
1488 Ok(RoundStatus::Pending) => {
1489 if let Err(e) = self.inner.db.update_round_state(&state).await {
1490 warn!("Error updating pending round state in db: {:#}", e);
1491 }
1492 },
1493 Ok(RoundStatus::Failed { error }) => {
1494 error!("Round failed: {}", error);
1495 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1496 warn!("Error removing failed round state from db: {:#}", e);
1497 }
1498 },
1499 Ok(RoundStatus::Canceled) => {
1500 error!("Round canceled");
1501 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1502 warn!("Error removing canceled round state from db: {:#}", e);
1503 }
1504 },
1505 Err(e) => warn!("Error syncing round: {:#}", e),
1506 }
1507 }).await;
1508
1509 Ok(())
1510 }
1511
1512 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1514 let (mut srv, _) = self.require_server().await?;
1515 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1516 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1517 }
1518
1519 async fn inner_process_event(
1520 &self,
1521 state: &mut StoredRoundState,
1522 event: Option<&RoundEvent>,
1523 ) {
1524 if let Some(event) = event && state.state().ongoing_participation() {
1525 let updated = state.state_mut().process_event(self, &event).await;
1526 if updated {
1527 if let Err(e) = self.inner.db.update_round_state(&state).await {
1528 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1529 }
1530 }
1531 }
1532
1533 match state.state_mut().sync(self).await {
1534 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1535 Ok(s) if s.is_final() => {
1536 info!("Round #{} finished with result: {:?}", state.id(), s);
1537 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1538 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1539 }
1540 },
1541 Ok(s) => {
1542 trace!("Round state #{} is now in state {:?}", state.id(), s);
1543 if let Err(e) = self.inner.db.update_round_state(&state).await {
1544 warn!("Error storing round state #{}: {:#}", state.id(), e);
1545 }
1546 },
1547 }
1548 }
1549
1550 pub async fn progress_pending_rounds(
1555 &self,
1556 last_round_event: Option<&RoundEvent>,
1557 ) -> anyhow::Result<()> {
1558 let states = self.pending_round_states().await?;
1559 if states.is_empty() {
1560 return Ok(());
1561 }
1562
1563 info!("Processing {} rounds...", states.len());
1564
1565 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1566
1567 let has_ongoing_participation = states.iter()
1568 .any(|s| s.state().ongoing_participation());
1569 if has_ongoing_participation && last_round_event.is_none() {
1570 match self.get_last_round_event().await {
1571 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1572 Err(e) => {
1573 warn!("Error fetching round event, \
1574 failed to progress ongoing rounds: {:#}", e);
1575 },
1576 }
1577 }
1578
1579 let event = last_round_event.as_ref().map(|c| c.as_ref());
1580
1581 let futs = states.into_iter().map(async |state| {
1582 let locked = self.lock_wait_round_state(state.id()).await?;
1583 if let Some(mut locked) = locked {
1584 self.inner_process_event(&mut locked, event).await;
1585 }
1586 Ok::<_, anyhow::Error>(())
1587 });
1588
1589 futures::future::join_all(futs).await;
1590
1591 Ok(())
1592 }
1593
1594 pub async fn subscribe_round_events(&self)
1595 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1596 {
1597 let (mut srv, _) = self.require_server().await?;
1598 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1599 .into_inner().map(|m| {
1600 let m = m.context("received error on event stream")?;
1601 let e = RoundEvent::try_from(m.clone())
1602 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1603 trace!("Received round event: {}", e);
1604 Ok::<_, anyhow::Error>(e)
1605 });
1606 Ok(events)
1607 }
1608
1609 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1614 let mut events = self.subscribe_round_events().await?;
1615
1616 loop {
1617 let state_ids = self.pending_round_states().await?.iter()
1620 .filter(|s| s.state().ongoing_participation())
1621 .map(|s| s.id())
1622 .collect::<Vec<_>>();
1623
1624 if state_ids.is_empty() {
1625 info!("All rounds handled");
1626 return Ok(());
1627 }
1628
1629 let event = events.next().await
1630 .context("events stream broke")?
1631 .context("error on event stream")?;
1632
1633 let futs = state_ids.into_iter().map(async |state| {
1634 let locked = self.lock_wait_round_state(state).await?;
1635 if let Some(mut locked) = locked {
1636 self.inner_process_event(&mut locked, Some(&event)).await;
1637 }
1638 Ok::<_, anyhow::Error>(())
1639 });
1640
1641 futures::future::join_all(futs).await;
1642 }
1643 }
1644
1645 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1650 let state_ids = self.inner.db.get_pending_round_state_ids().await?;
1652
1653 let futures = state_ids.into_iter().map(|state_id| {
1654 async move {
1655 let mut state = match self.lock_wait_round_state(state_id).await {
1657 Ok(Some(s)) => s,
1658 Ok(None) => return,
1659 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1660 };
1661
1662 match state.state_mut().try_cancel(self).await {
1663 Ok(true) => {
1664 if let Err(e) = self.inner.db.remove_round_state(&state).await {
1665 warn!("Error removing canceled round state from db: {:#}", e);
1666 }
1667 },
1668 Ok(false) => {},
1669 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1670 }
1671 }
1672 });
1673
1674 join_all(futures).await;
1675
1676 Ok(())
1677 }
1678
1679 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1681 let mut state = self.lock_wait_round_state(id).await?
1682 .context("round state not found")?;
1683
1684 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1685 self.inner.db.remove_round_state(&state).await
1686 .context("error removing canceled round state from db")?;
1687 } else {
1688 bail!("failed to cancel round");
1689 }
1690
1691 Ok(())
1692 }
1693
1694 pub(crate) async fn participate_round(
1701 &self,
1702 participation: RoundParticipation,
1703 movement_kind: Option<RoundMovement>,
1704 ) -> anyhow::Result<RoundStatus> {
1705 let mut state = self.join_next_round(participation, movement_kind).await?;
1706
1707 info!("Waiting for a round start...");
1708 let mut events = self.subscribe_round_events().await?;
1709
1710 loop {
1711 if !state.state().ongoing_participation() {
1712 let status = state.state_mut().sync(self).await?;
1713 match status {
1714 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1715 RoundStatus::Canceled => bail!("round canceled"),
1716 status => return Ok(status),
1717 }
1718 }
1719
1720 let event = events.next().await
1721 .context("events stream broke")?
1722 .context("error on event stream")?;
1723 if state.state_mut().process_event(self, &event).await {
1724 self.inner.db.update_round_state(&state).await?;
1725 }
1726 }
1727 }
1728}