1mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
31use ark::mailbox::MailboxIdentifier;
32use ark::forfeit::HashLockedForfeitBundle;
33use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
34use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
35use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
36use bitcoin_ext::TxStatus;
37use server_rpc::{protos, ServerConnection, TryFromBytes};
38
39use crate::{SECP, Wallet, WalletVtxo};
40use crate::movement::{MovementId, MovementStatus};
41use crate::movement::update::MovementUpdate;
42use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
43use crate::subsystem::{RoundMovement, Subsystem};
44
45
46const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RoundParticipation {
52 #[serde(with = "ark::encode::serde::vec")]
53 pub inputs: Vec<Vtxo<Full>>,
54 pub outputs: Vec<VtxoRequest>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub unblinded_mailbox_id: Option<MailboxIdentifier>,
60}
61
62impl RoundParticipation {
63 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
64 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
65 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
66 let fee = input_amount - output_amount;
67 Ok(MovementUpdate::new()
68 .consumed_vtxos(&self.inputs)
69 .intended_balance(SignedAmount::ZERO)
70 .effective_balance( - fee.to_signed()?)
71 .fee(fee)
72 )
73 }
74}
75
76#[derive(Debug, Clone)]
77pub enum RoundStatus {
78 Confirmed {
80 funding_txid: Txid,
81 },
82 Unconfirmed {
84 funding_txid: Txid,
85 },
86 Pending,
88 Failed {
90 error: String,
91 },
92 Canceled,
94}
95
96impl RoundStatus {
97 pub fn is_final(&self) -> bool {
99 match self {
100 Self::Confirmed { .. } => true,
101 Self::Unconfirmed { .. } => false,
102 Self::Pending => false,
103 Self::Failed { .. } => true,
104 Self::Canceled => true,
105 }
106 }
107
108 pub fn is_success(&self) -> bool {
110 match self {
111 Self::Confirmed { .. } => true,
112 Self::Unconfirmed { .. } => true,
113 Self::Pending => false,
114 Self::Failed { .. } => false,
115 Self::Canceled => false,
116 }
117 }
118}
119
120pub struct RoundState {
133 pub(crate) done: bool,
135
136 pub(crate) participation: RoundParticipation,
138
139 pub(crate) flow: RoundFlowState,
141
142 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
147
148 pub(crate) sent_forfeit_sigs: bool,
155
156 pub(crate) movement_id: Option<MovementId>,
158}
159
160impl RoundState {
161 fn new_interactive(
162 participation: RoundParticipation,
163 movement_id: Option<MovementId>,
164 ) -> Self {
165 Self {
166 participation,
167 movement_id,
168 flow: RoundFlowState::InteractivePending,
169 new_vtxos: Vec::new(),
170 sent_forfeit_sigs: false,
171 done: false,
172 }
173 }
174
175 fn new_non_interactive(
176 participation: RoundParticipation,
177 unlock_hash: UnlockHash,
178 movement_id: Option<MovementId>,
179 ) -> Self {
180 Self {
181 participation,
182 movement_id,
183 flow: RoundFlowState::NonInteractivePending { unlock_hash },
184 new_vtxos: Vec::new(),
185 sent_forfeit_sigs: false,
186 done: false,
187 }
188 }
189
190 pub fn participation(&self) -> &RoundParticipation {
192 &self.participation
193 }
194
195 pub fn unlock_hash(&self) -> Option<UnlockHash> {
197 match self.flow {
198 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
199 RoundFlowState::InteractivePending => None,
200 RoundFlowState::InteractiveOngoing { .. } => None,
201 RoundFlowState::Failed { .. } => None,
202 RoundFlowState::Canceled => None,
203 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
204 }
205 }
206
207 pub fn funding_tx(&self) -> Option<&Transaction> {
208 match self.flow {
209 RoundFlowState::NonInteractivePending { .. } => None,
210 RoundFlowState::InteractivePending => None,
211 RoundFlowState::InteractiveOngoing { .. } => None,
212 RoundFlowState::Failed { .. } => None,
213 RoundFlowState::Canceled => None,
214 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
215 }
216 }
217
218 pub fn ongoing_participation(&self) -> bool {
220 match self.flow {
221 RoundFlowState::NonInteractivePending { .. } => false,
222 RoundFlowState::InteractivePending => true,
223 RoundFlowState::InteractiveOngoing { .. } => true,
224 RoundFlowState::Failed { .. } => false,
225 RoundFlowState::Canceled => false,
226 RoundFlowState::Finished { .. } => false,
227 }
228 }
229
230 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
233 let ret = match self.flow {
234 RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
235 RoundFlowState::Canceled => true,
236 RoundFlowState::Failed { .. } => true,
237 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
238 self.flow = RoundFlowState::Canceled;
239 true
240 },
241 RoundFlowState::Finished { .. } => false,
242 };
243 if ret {
244 persist_round_failure(wallet, &self.participation, self.movement_id).await
245 .context("failed to persist round failure for cancelation")?;
246 }
247 Ok(ret)
248 }
249
250 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
251 match start_attempt(wallet, &self.participation, attempt).await {
252 Ok(state) => {
253 self.flow = RoundFlowState::InteractiveOngoing {
254 round_seq: attempt.round_seq,
255 attempt_seq: attempt.attempt_seq,
256 state: state,
257 };
258 },
259 Err(e) => {
260 self.flow = RoundFlowState::Failed {
261 error: format!("{:#}", e),
262 };
263 },
264 }
265 }
266
267 pub async fn process_event(
269 &mut self,
270 wallet: &Wallet,
271 event: &RoundEvent,
272 ) -> bool {
273 let _: Infallible = match self.flow {
274 RoundFlowState::InteractivePending => {
275 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
276 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
277 self.try_start_attempt(wallet, e).await;
278 return true;
279 } else {
280 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
281 event.kind(), event.round_seq(), event.attempt_seq(),
282 );
283 return false;
284 }
285 },
286 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
287 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
290 warn!("Round {} failed by server", round_seq);
291 self.flow = RoundFlowState::Failed {
292 error: format!("round {} failed by server", round_seq),
293 };
294 return true;
295 }
296
297 if event.round_seq() > round_seq {
298 self.flow = RoundFlowState::Failed {
301 error: format!("round {} started while we were on {}",
302 event.round_seq(), round_seq,
303 ),
304 };
305 return true;
306 }
307
308 if event.attempt_seq() < attempt_seq {
309 trace!("ignoring replayed message from old attempt");
310 return false;
311 }
312
313 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
314 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
315 self.try_start_attempt(wallet, e).await;
316 return true;
317 }
318 trace!("Processing event {} for round attempt {}:{} in state {}",
319 event.kind(), round_seq, attempt_seq, state.kind(),
320 );
321
322 return match progress_attempt(state, wallet, &self.participation, event).await {
323 AttemptProgressResult::NotUpdated => false,
324 AttemptProgressResult::Updated { new_state } => {
325 *state = new_state;
326 true
327 },
328 AttemptProgressResult::Failed(e) => {
329 warn!("Round failed with error: {:#}", e);
330 self.flow = RoundFlowState::Failed {
331 error: format!("{:#}", e),
332 };
333 true
334 },
335 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
336 self.new_vtxos = vtxos;
337 let funding_txid = funding_tx.compute_txid();
338 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
339 if let Some(mid) = self.movement_id {
340 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
341 warn!("Error updating the round funding txid: {:#}", e);
342 }
343 }
344 true
345 },
346 };
347 },
348 RoundFlowState::NonInteractivePending { .. }
349 | RoundFlowState::Finished { .. }
350 | RoundFlowState::Failed { .. }
351 | RoundFlowState::Canceled => return false,
352 };
353 }
354
355 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
360 match self.flow {
361 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
362 Ok(RoundStatus::Confirmed {
363 funding_txid: funding_tx.compute_txid(),
364 })
365 },
366
367 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
368 Ok(RoundStatus::Pending)
369 },
370 RoundFlowState::Failed { ref error } => {
371 persist_round_failure(wallet, &self.participation, self.movement_id).await
372 .context("failed to persist round failure")?;
373 Ok(RoundStatus::Failed { error: error.clone() })
374 },
375 RoundFlowState::Canceled => {
376 persist_round_failure(wallet, &self.participation, self.movement_id).await
377 .context("failed to persist round failure")?;
378 Ok(RoundStatus::Canceled)
379 },
380
381 RoundFlowState::NonInteractivePending { unlock_hash } => {
382 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
383 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
384 Ok(HarkProgressResult::RoundNotFound) => {
385 self.handle_round_not_found(wallet).await
386 },
387 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
388 let funding_txid = funding_tx.compute_txid();
389 self.new_vtxos = new_vtxos;
390 self.flow = RoundFlowState::Finished {
391 funding_tx: funding_tx.clone(),
392 unlock_hash: unlock_hash,
393 };
394
395 persist_round_success(
396 wallet,
397 &self.participation,
398 self.movement_id,
399 &self.new_vtxos,
400 &funding_tx,
401 ).await.context("failed to store successful round in DB!")?;
402
403 self.done = true;
404
405 Ok(RoundStatus::Confirmed { funding_txid })
406 },
407 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
408 if let Some(mid) = self.movement_id {
409 update_funding_txid(wallet, mid, funding_txid).await
410 .context("failed to update funding txid in DB")?;
411 }
412 Ok(RoundStatus::Unconfirmed { funding_txid })
413 },
414
415 Err(HarkForfeitError::Err(e)) => {
418 Err(e.context("error progressing non-interactive round"))
422 },
423 Err(HarkForfeitError::SentForfeits(e)) => {
424 self.sent_forfeit_sigs = true;
425 Err(e.context("error progressing non-interactive round \
426 after sending forfeit tx signatures"))
427 },
428 }
429 },
430 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
432 let funding_txid = funding_tx.compute_txid();
433 let confirmed = check_funding_tx_confirmations(
434 wallet, funding_txid, &funding_tx,
435 ).await.context("error checking funding tx confirmations")?;
436 if !confirmed {
437 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
438 return Ok(RoundStatus::Unconfirmed { funding_txid });
439 }
440
441 match hark_vtxo_swap(
442 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
443 ).await {
444 Ok(()) => {
445 persist_round_success(
446 wallet,
447 &self.participation,
448 self.movement_id,
449 &self.new_vtxos,
450 &funding_tx,
451 ).await.context("failed to store successful round in DB!")?;
452
453 self.done = true;
454
455 Ok(RoundStatus::Confirmed { funding_txid })
456 },
457 Err(HarkForfeitError::Err(e)) => {
458 Err(e.context("error forfeiting VTXOs after round"))
459 },
460 Err(HarkForfeitError::SentForfeits(e)) => {
461 self.sent_forfeit_sigs = true;
462 Err(e.context("error after having signed and sent \
463 forfeit signatures to server"))
464 },
465 }
466 },
467 }
468 }
469
470 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
473 if self.new_vtxos.is_empty() {
474 None
475 } else {
476 Some(&self.new_vtxos)
477 }
478 }
479
480 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
483 match self.flow {
485 RoundFlowState::NonInteractivePending { .. }
486 | RoundFlowState::InteractivePending
487 | RoundFlowState::InteractiveOngoing { .. }
488 => {
489 &self.participation.inputs
490 },
491 RoundFlowState::Finished { .. } => if self.done {
492 &[]
494 } else {
495 &self.participation.inputs
496 },
497 RoundFlowState::Failed { .. }
498 | RoundFlowState::Canceled
499 => {
500 &[]
502 },
503 }
504 }
505
506 pub fn pending_balance(&self) -> Amount {
510 if self.done {
511 return Amount::ZERO;
512 }
513
514 match self.flow {
515 RoundFlowState::NonInteractivePending { .. }
516 | RoundFlowState::InteractivePending
517 | RoundFlowState::InteractiveOngoing { .. }
518 | RoundFlowState::Finished { .. }
519 => {
520 self.participation.outputs.iter().map(|o| o.amount).sum()
521 },
522 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
523 Amount::ZERO
524 },
525 }
526 }
527
528 async fn handle_round_not_found(
535 &mut self,
536 wallet: &Wallet,
537 ) -> anyhow::Result<RoundStatus> {
538 info!("Server reports round participation not found (no forfeits sent)");
539 self.flow = RoundFlowState::Failed {
540 error: "server reports round participation not found".into(),
541 };
542 persist_round_failure(wallet, &self.participation, self.movement_id).await
543 .context("failed to persist round failure")?;
544
545 Ok(RoundStatus::Failed {
546 error: "server reports round participation not found".into(),
547 })
548 }
549}
550
551pub enum RoundFlowState {
556 NonInteractivePending {
558 unlock_hash: UnlockHash,
559 },
560
561 InteractivePending,
563 InteractiveOngoing {
565 round_seq: RoundSeq,
566 attempt_seq: usize,
567 state: AttemptState,
568 },
569
570 Finished {
572 funding_tx: Transaction,
573 unlock_hash: UnlockHash,
574 },
575
576 Failed {
578 error: String,
579 },
580
581 Canceled,
583}
584
585pub enum AttemptState {
590 AwaitingAttempt,
591 AwaitingUnsignedVtxoTree {
592 cosign_keys: Vec<Keypair>,
593 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
594 unlock_hash: UnlockHash,
595 },
596 AwaitingFinishedRound {
597 unsigned_round_tx: Transaction,
598 vtxos_spec: VtxoTreeSpec,
599 unlock_hash: UnlockHash,
600 },
601}
602
603impl AttemptState {
604 fn kind(&self) -> &'static str {
606 match self {
607 Self::AwaitingAttempt => "AwaitingAttempt",
608 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
609 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
610 }
611 }
612}
613
614enum AttemptProgressResult {
616 Finished {
617 funding_tx: Transaction,
618 vtxos: Vec<Vtxo<Full>>,
619 unlock_hash: UnlockHash,
620 },
621 Failed(anyhow::Error),
622 Updated {
628 new_state: AttemptState,
629 },
630 NotUpdated,
631}
632
633async fn start_attempt(
635 wallet: &Wallet,
636 participation: &RoundParticipation,
637 event: &RoundAttempt,
638) -> anyhow::Result<AttemptState> {
639 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
640
641 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
643 .take(participation.outputs.len())
644 .collect::<Vec<_>>();
645
646 let cosign_nonces = cosign_keys.iter()
649 .map(|key| {
650 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
651 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
652 for _ in 0..ark_info.nb_round_nonces {
653 let (s, p) = musig::nonce_pair(key);
654 secs.push(s);
655 pubs.push(p);
656 }
657 (secs, pubs)
658 })
659 .take(participation.outputs.len())
660 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
661
662
663 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
665 participation.inputs.len(), participation.outputs.len(),
666 );
667
668 let unblinded_mailbox_id = wallet.mailbox_identifier();
670 let signed_reqs = participation.outputs.iter()
671 .zip(cosign_keys.iter())
672 .zip(cosign_nonces.iter())
673 .map(|((req, cosign_key), (_sec, pub_nonces))| {
674 SignedVtxoRequest {
675 vtxo: req.clone(),
676 cosign_pubkey: cosign_key.public_key(),
677 nonces: pub_nonces.clone(),
678 }
679 })
680 .collect::<Vec<_>>();
681
682 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
683 for vtxo in participation.inputs.iter() {
684 let keypair = wallet.get_vtxo_key(vtxo).await
685 .map_err(HarkForfeitError::Err)?;
686 input_vtxos.push(protos::InputVtxo {
687 vtxo_id: vtxo.id().to_bytes().to_vec(),
688 attestation: {
689 let attestation = RoundAttemptAttestation::new(
690 event.challenge, vtxo.id(), &signed_reqs, &keypair,
691 );
692 attestation.serialize()
693 },
694 });
695 }
696
697 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
699 .map_err(HarkForfeitError::Err)?;
700
701 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
702 input_vtxos: input_vtxos,
703 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
704 #[allow(deprecated)]
705 offboard_requests: vec![],
706 unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
707 }).await.context("Ark server refused our payment submission")?;
708
709 Ok(AttemptState::AwaitingUnsignedVtxoTree {
710 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
711 cosign_keys: cosign_keys,
712 secret_nonces: cosign_nonces.into_iter()
713 .map(|(sec, _pub)| sec.into_iter()
714 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
715 .collect())
716 .collect(),
717 })
718}
719
720#[derive(Debug, thiserror::Error)]
722enum HarkForfeitError {
723 #[error("error after forfeits were sent")]
725 SentForfeits(#[source] anyhow::Error),
726 #[error("error before forfeits were sent")]
728 Err(#[source] anyhow::Error),
729}
730
731async fn hark_cosign_leaf(
732 wallet: &Wallet,
733 srv: &mut ServerConnection,
734 funding_tx: &Transaction,
735 vtxo: &mut Vtxo<Full>,
736) -> anyhow::Result<()> {
737 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
738 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
739 .with_context(|| format!(
740 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
741 ))?.1;
742 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
743 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
744 protos::LeafVtxoCosignRequest::from(cosign_req),
745 ).await
746 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
747 .into_inner().try_into()
748 .context("bad leaf vtxo cosign response")?;
749 ensure!(ctx.finalize(vtxo, cosign_resp),
750 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
751 );
752
753 Ok(())
754}
755
756async fn hark_vtxo_swap(
766 wallet: &Wallet,
767 participation: &RoundParticipation,
768 output_vtxos: &mut [Vtxo<Full>],
769 funding_tx: &Transaction,
770 unlock_hash: UnlockHash,
771) -> Result<(), HarkForfeitError> {
772 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
773
774 wallet.register_vtxo_transactions_with_server(&participation.inputs).await
776 .context("couldn't send our input vtxo transactions to server")
777 .map_err(HarkForfeitError::Err)?;
778
779 for vtxo in output_vtxos.iter_mut() {
781 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
782 .map_err(HarkForfeitError::Err)?;
783 }
784
785 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
788 unlock_hash: unlock_hash.to_byte_array().to_vec(),
789 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
790 }).await
791 .context("request forfeits nonces call failed")
792 .map_err(HarkForfeitError::Err)?
793 .into_inner().public_nonces.into_iter()
794 .map(|b| musig::PublicNonce::from_bytes(b))
795 .collect::<Result<Vec<_>, _>>()
796 .context("invalid forfeit nonces")
797 .map_err(HarkForfeitError::Err)?;
798
799 if server_nonces.len() != participation.inputs.len() {
800 return Err(HarkForfeitError::Err(anyhow!(
801 "server sent {} nonce pairs, expected {}",
802 server_nonces.len(), participation.inputs.len(),
803 )));
804 }
805
806 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
807 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
808 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
809 .ok().flatten().with_context(|| format!(
810 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
811 )).map_err(HarkForfeitError::Err)?.1;
812 forfeit_bundles.push(HashLockedForfeitBundle::new(
813 input, unlock_hash, &user_key, &nonces,
814 ))
815 }
816
817 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
818 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
819 }).await
820 .context("forfeit vtxos call failed")
821 .map_err(HarkForfeitError::SentForfeits)?
822 .into_inner().unlock_preimage.as_slice().try_into()
823 .context("invalid preimage length")
824 .map_err(HarkForfeitError::SentForfeits)?;
825
826 for vtxo in output_vtxos.iter_mut() {
827 if !vtxo.provide_unlock_preimage(preimage) {
828 return Err(HarkForfeitError::SentForfeits(anyhow!(
829 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
830 preimage.as_hex(), vtxo.id(), unlock_hash,
831 )));
832 }
833
834 vtxo.validate(&funding_tx).with_context(|| format!(
836 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
837 )).map_err(HarkForfeitError::SentForfeits)?;
838 }
839
840 Ok(())
841}
842
843fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
844 match vtxo.validate(funding_tx) {
845 Err(VtxoValidationError::GenesisTransition {
846 genesis_idx, genesis_len, transition_kind, ..
847 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
848 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
849 vtxo.serialize_hex(),
850 )),
851 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
852 }
853}
854
855fn check_round_matches_participation(
856 part: &RoundParticipation,
857 new_vtxos: &[Vtxo<Full>],
858 funding_tx: &Transaction,
859) -> anyhow::Result<()> {
860 ensure!(new_vtxos.len() == part.outputs.len(),
861 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
862 );
863
864 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
865 ensure!(vtxo.amount() == req.amount,
866 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
867 );
868 ensure!(*vtxo.policy() == req.policy,
869 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
870 );
871
872 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
874 }
875
876 Ok(())
877}
878
879async fn check_funding_tx_confirmations(
889 wallet: &Wallet,
890 funding_txid: Txid,
891 funding_tx: &Transaction,
892) -> anyhow::Result<bool> {
893 let tip = wallet.chain.tip().await.context("chain source error")?;
894 let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
895 let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
896 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
897 funding_txid, tx_status, tip,
898 );
899 match tx_status {
900 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
901 TxStatus::Mempool | TxStatus::Confirmed(_) => {
902 if wallet.config.round_tx_required_confirmations == 0 {
903 debug!("Accepting round funding tx without confirmations because of configuration");
904 Ok(true)
905 } else {
906 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
907 Ok(false)
908 }
909 },
910 TxStatus::NotFound => {
911 if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
916 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
917 funding_txid, serialize_hex(funding_tx), e,
918 ))
919 } else {
920 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
921 Ok(false)
922 }
923 },
924 }
925}
926
927enum HarkProgressResult {
928 RoundPending,
929 RoundNotFound,
930 FundingTxUnconfirmed {
931 funding_txid: Txid,
932 },
933 Ok {
934 funding_tx: Transaction,
935 new_vtxos: Vec<Vtxo<Full>>,
936 },
937}
938
939async fn progress_non_interactive(
940 wallet: &Wallet,
941 participation: &RoundParticipation,
942 unlock_hash: UnlockHash,
943) -> Result<HarkProgressResult, HarkForfeitError> {
944 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
945
946 let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
947 unlock_hash: unlock_hash.to_byte_array().to_vec(),
948 }).await {
949 Ok(resp) => resp.into_inner(),
950 Err(err) if err.code() == tonic::Code::NotFound => {
951 return Ok(HarkProgressResult::RoundNotFound);
952 },
953 Err(err) => {
954 return Err(HarkForfeitError::Err(
955 anyhow::Error::from(err).context("error checking round participation status"),
956 ));
957 },
958 };
959 let status = protos::RoundParticipationStatus::try_from(resp.status)
960 .context("unknown status from server")
961 .map_err(HarkForfeitError::Err) ?;
962
963 if status == protos::RoundParticipationStatus::RoundPartPending {
964 trace!("Hark round still pending");
965 return Ok(HarkProgressResult::RoundPending);
966 }
967
968 if status == protos::RoundParticipationStatus::RoundPartReleased {
973 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
974 warn!("Server says preimage was already released for hArk participation \
975 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
976 );
977 }
978
979 let funding_tx_bytes = resp.round_funding_tx
980 .context("funding txid should be provided when status is not pending")
981 .map_err(HarkForfeitError::Err)?;
982 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
983 .context("invalid funding txid")
984 .map_err(HarkForfeitError::Err)?;
985 let funding_txid = funding_tx.compute_txid();
986 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
987 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
988 );
989
990 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
992 Ok(true) => {},
993 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
994 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
995 }
996
997 let mut new_vtxos = resp.output_vtxos.into_iter()
998 .map(|v| <Vtxo<Full>>::deserialize(&v))
999 .collect::<Result<Vec<_>, _>>()
1000 .context("invalid output VTXOs from server")
1001 .map_err(HarkForfeitError::Err)?;
1002
1003 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
1005 .context("new VTXOs received from server don't match our participation")
1006 .map_err(HarkForfeitError::Err)?;
1007
1008 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
1009 .context("error forfeiting hArk VTXOs")
1010 .map_err(HarkForfeitError::SentForfeits)?;
1011
1012 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
1013}
1014
1015async fn progress_attempt(
1016 state: &AttemptState,
1017 wallet: &Wallet,
1018 part: &RoundParticipation,
1019 event: &RoundEvent,
1020) -> AttemptProgressResult {
1021 match (state, event) {
1025
1026 (
1027 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
1028 RoundEvent::VtxoProposal(e),
1029 ) => {
1030 trace!("Received VtxoProposal: {:#?}", e);
1031 match sign_vtxo_tree(
1032 wallet,
1033 part,
1034 &cosign_keys,
1035 &secret_nonces,
1036 &e.unsigned_round_tx,
1037 &e.vtxos_spec,
1038 &e.cosign_agg_nonces,
1039 ).await {
1040 Ok(()) => {
1041 AttemptProgressResult::Updated {
1042 new_state: AttemptState::AwaitingFinishedRound {
1043 unsigned_round_tx: e.unsigned_round_tx.clone(),
1044 vtxos_spec: e.vtxos_spec.clone(),
1045 unlock_hash: *unlock_hash,
1046 },
1047 }
1048 },
1049 Err(e) => {
1050 trace!("Error signing VTXO tree: {:#}", e);
1051 AttemptProgressResult::Failed(e)
1052 },
1053 }
1054 },
1055
1056 (
1057 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1058 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1059 ) => {
1060 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1061 return AttemptProgressResult::Failed(anyhow!(
1062 "signed funding tx ({}) doesn't match tx received before ({})",
1063 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1064 ));
1065 }
1066
1067 if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1068 warn!("Failed to broadcast signed round tx: {:#}", e);
1069 }
1070
1071 match construct_new_vtxos(
1072 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1073 ).await {
1074 Ok(v) => AttemptProgressResult::Finished {
1075 funding_tx: signed_round_tx.clone(),
1076 vtxos: v,
1077 unlock_hash: *unlock_hash,
1078 },
1079 Err(e) => AttemptProgressResult::Failed(anyhow!(
1080 "failed to construct new VTXOs for round: {:#}", e,
1081 )),
1082 }
1083 },
1084
1085 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1086 AttemptProgressResult::Failed(anyhow!(
1087 "unexpectedly received a finished round while we were in state {}",
1088 state.kind(),
1089 ))
1090 },
1091
1092 (state, _) => {
1093 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1094 AttemptProgressResult::NotUpdated
1095 },
1096 }
1097}
1098
1099async fn sign_vtxo_tree(
1100 wallet: &Wallet,
1101 participation: &RoundParticipation,
1102 cosign_keys: &[Keypair],
1103 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1104 unsigned_round_tx: &Transaction,
1105 vtxo_tree: &VtxoTreeSpec,
1106 cosign_agg_nonces: &[musig::AggregatedNonce],
1107) -> anyhow::Result<()> {
1108 let (srv, _) = wallet.require_server().await.context("server not available")?;
1109
1110 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1111
1112 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1114 for vtxo_req in vtxo_tree.iter_vtxos() {
1115 if let Some(i) = my_vtxos.iter().position(|v| {
1116 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1117 }) {
1118 my_vtxos.swap_remove(i);
1119 }
1120 }
1121 if !my_vtxos.is_empty() {
1122 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1123 }
1124
1125 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1127 trace!("Sending vtxo signatures to server...");
1128 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1129 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1130 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1131 let part_sigs = unsigned_vtxos.cosign_branch(
1132 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1133 ).context("failed to cosign branch: our request not part of tree")?;
1134
1135 info!("Sending {} partial vtxo cosign signatures for pk {}",
1136 part_sigs.len(), key.public_key(),
1137 );
1138
1139 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1140 pubkey: key.public_key().serialize().to_vec(),
1141 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1142 }).await.context("error sending vtxo signatures")?;
1143
1144 Result::<(), anyhow::Error>::Ok(())
1145 })).await.context("error sending VTXO signatures")?;
1146 trace!("Done sending vtxo signatures to server");
1147
1148 Ok(())
1149}
1150
1151async fn construct_new_vtxos(
1152 participation: &RoundParticipation,
1153 unsigned_round_tx: &Transaction,
1154 vtxo_tree: &VtxoTreeSpec,
1155 vtxo_cosign_sigs: &[schnorr::Signature],
1156) -> anyhow::Result<Vec<Vtxo<Full>>> {
1157 let round_txid = unsigned_round_tx.compute_txid();
1158 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1159 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1160
1161 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1163 bail!("Received incorrect vtxo cosign signatures from server");
1165 }
1166
1167 let signed_vtxos = vtxo_tree
1168 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1169 .into_cached_tree();
1170
1171 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1172 let total_nb_expected_vtxos = expected_vtxos.len();
1173
1174 let mut new_vtxos = vec![];
1175 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1176 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1177 let vtxo = signed_vtxos.build_vtxo(idx);
1178
1179 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1182 .context("constructed invalid vtxo from tree")?;
1183
1184 info!("New VTXO from round: {} ({}, {})",
1185 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1186 );
1187
1188 new_vtxos.push(vtxo);
1189 expected_vtxos.swap_remove(expected_idx);
1190 }
1191 }
1192
1193 if !expected_vtxos.is_empty() {
1194 if expected_vtxos.len() == total_nb_expected_vtxos {
1195 bail!("None of our VTXOs were present in round!");
1197 } else {
1198 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1199 expected_vtxos.len(), expected_vtxos,
1200 );
1201 }
1202 }
1203 Ok(new_vtxos)
1204}
1205
1206async fn persist_round_success(
1208 wallet: &Wallet,
1209 participation: &RoundParticipation,
1210 movement_id: Option<MovementId>,
1211 new_vtxos: &[Vtxo<Full>],
1212 funding_tx: &Transaction,
1213) -> anyhow::Result<()> {
1214 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1215 new_vtxos.len(), movement_id,
1216 );
1217
1218 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1222 .context("failed to store new VTXOs");
1223 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1224 .context("failed to mark input VTXOs as spent");
1225 let update_result = if let Some(mid) = movement_id {
1226 wallet.movements.finish_movement_with_update(
1227 mid,
1228 MovementStatus::Successful,
1229 MovementUpdate::new()
1230 .produced_vtxos(new_vtxos)
1231 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1232 ).await.context("failed to mark movement as finished")
1233 } else {
1234 Ok(())
1235 };
1236
1237 store_result?;
1238 spent_result?;
1239 update_result?;
1240
1241 Ok(())
1242}
1243
1244async fn persist_round_failure(
1245 wallet: &Wallet,
1246 participation: &RoundParticipation,
1247 movement_id: Option<MovementId>,
1248) -> anyhow::Result<()> {
1249 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1250 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1251 let finish_result = if let Some(movement_id) = movement_id {
1252 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1253 } else {
1254 Ok(())
1255 };
1256 if let Err(e) = &finish_result {
1257 error!("Failed to mark movement as failed: {:#}", e);
1258 }
1259 match (unlock_result, finish_result) {
1260 (Ok(()), Ok(())) => Ok(()),
1261 (Err(e), _) => Err(e),
1262 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1263 }
1264}
1265
1266async fn update_funding_txid(
1267 wallet: &Wallet,
1268 movement_id: MovementId,
1269 funding_txid: Txid,
1270) -> anyhow::Result<()> {
1271 wallet.movements.update_movement(
1272 movement_id,
1273 MovementUpdate::new()
1274 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1275 ).await.context("Unable to update funding txid of round")
1276}
1277
1278impl Wallet {
1279 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1281 let (mut srv, _) = self.require_server().await?;
1282 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1283 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1284 }
1285
1286 pub async fn join_next_round(
1295 &self,
1296 participation: RoundParticipation,
1297 movement_kind: Option<RoundMovement>,
1298 ) -> anyhow::Result<StoredRoundState> {
1299 let movement_id = if let Some(kind) = movement_kind {
1300 Some(self.movements.new_movement_with_update(
1301 Subsystem::ROUND,
1302 kind.to_string(),
1303 participation.to_movement_update()?
1304 ).await?)
1305 } else {
1306 None
1307 };
1308 let state = RoundState::new_interactive(participation, movement_id);
1309
1310 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1311 let state = self.lock_wait_round_state(id).await?
1312 .context("failed to lock fresh round state")?;
1313
1314 Ok(state)
1315 }
1316
1317 pub async fn join_next_round_delegated(
1319 &self,
1320 participation: RoundParticipation,
1321 movement_kind: Option<RoundMovement>,
1322 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1323 let (mut srv, _) = self.require_server().await?;
1324
1325 let movement_id = if let Some(kind) = movement_kind {
1326 Some(self.movements.new_movement_with_update(
1327 Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1328 ).await?)
1329 } else {
1330 None
1331 };
1332
1333 let unblinded_mailbox_id = self.mailbox_identifier();
1335
1336 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1338 for vtxo in participation.inputs.iter() {
1339 let keypair = self.get_vtxo_key(vtxo).await
1340 .context("failed to get vtxo keypair")?;
1341 input_vtxos.push(protos::InputVtxo {
1342 vtxo_id: vtxo.id().to_bytes().to_vec(),
1343 attestation: {
1344 let attestation = DelegatedRoundParticipationAttestation::new(
1345 vtxo.id(), &participation.outputs, &keypair,
1346 );
1347 attestation.serialize()
1348 },
1349 });
1350 }
1351
1352 let vtxo_requests = participation.outputs.iter()
1354 .map(|req|
1355 protos::VtxoRequest {
1356 policy: req.policy.serialize(),
1357 amount: req.amount.to_sat(),
1358 })
1359 .collect::<Vec<_>>();
1360
1361 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1363 input_vtxos,
1364 vtxo_requests,
1365 unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
1366 }).await.context("error submitting round participation to server")?.into_inner();
1367
1368 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1369 .context("invalid unlock hash from server")?;
1370
1371 let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1372
1373 info!("Non-interactive round participation submitted, it will automatically execute \
1374 when you next sync your wallet after the round happened \
1375 (and has sufficient confirmations).",
1376 );
1377
1378 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1379 Ok(StoredRoundState::new(id, state))
1380 }
1381
1382 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1384 self.db.get_pending_round_state_ids().await
1385 }
1386
1387 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1389 let ids = self.db.get_pending_round_state_ids().await?;
1390 let mut states = Vec::with_capacity(ids.len());
1391 for id in ids {
1392 if let Some(state) = self.db.get_round_state_by_id(id).await? {
1393 states.push(state);
1394 }
1395 }
1396 Ok(states)
1397 }
1398
1399 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1401 let mut ret = Amount::ZERO;
1402 for round in self.pending_round_states().await? {
1403 ret += round.state().pending_balance();
1404 }
1405 Ok(ret)
1406 }
1407
1408 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1413 let mut ret = Vec::new();
1414 for round in self.pending_round_states().await? {
1415 let inputs = round.state().locked_pending_inputs();
1416 ret.reserve(inputs.len());
1417 for input in inputs {
1418 let v = self.get_vtxo_by_id(input.id()).await
1419 .context("unknown round input VTXO")?;
1420 ret.push(v);
1421 }
1422 }
1423 Ok(ret)
1424 }
1425
1426 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1428 let states = self.pending_round_states().await?;
1429 if states.is_empty() {
1430 return Ok(());
1431 }
1432
1433 debug!("Syncing {} pending round states...", states.len());
1434
1435 tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1436 if state.state().ongoing_participation() {
1438 return;
1439 }
1440
1441 let mut state = match self.lock_wait_round_state(state.id()).await {
1442 Ok(Some(state)) => state,
1443 Ok(None) => return,
1444 Err(e) => {
1445 warn!("Error locking round state: {:#}", e);
1446 return;
1447 },
1448 };
1449
1450 let status = state.state_mut().sync(self).await;
1451 trace!("Synced round #{}, status: {:?}", state.id(), status);
1452 match status {
1453 Ok(RoundStatus::Confirmed { funding_txid }) => {
1454 info!("Round confirmed. Funding tx {}", funding_txid);
1455 if let Err(e) = self.db.remove_round_state(&state).await {
1456 warn!("Error removing confirmed round state from db: {:#}", e);
1457 }
1458 },
1459 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1460 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1461 if let Err(e) = self.db.update_round_state(&state).await {
1462 warn!("Error updating pending round state in db: {:#}", e);
1463 }
1464 },
1465 Ok(RoundStatus::Pending) => {
1466 if let Err(e) = self.db.update_round_state(&state).await {
1467 warn!("Error updating pending round state in db: {:#}", e);
1468 }
1469 },
1470 Ok(RoundStatus::Failed { error }) => {
1471 error!("Round failed: {}", error);
1472 if let Err(e) = self.db.remove_round_state(&state).await {
1473 warn!("Error removing failed round state from db: {:#}", e);
1474 }
1475 },
1476 Ok(RoundStatus::Canceled) => {
1477 error!("Round canceled");
1478 if let Err(e) = self.db.remove_round_state(&state).await {
1479 warn!("Error removing canceled round state from db: {:#}", e);
1480 }
1481 },
1482 Err(e) => warn!("Error syncing round: {:#}", e),
1483 }
1484 }).await;
1485
1486 Ok(())
1487 }
1488
1489 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1491 let (mut srv, _) = self.require_server().await?;
1492 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1493 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1494 }
1495
1496 async fn inner_process_event(
1497 &self,
1498 state: &mut StoredRoundState,
1499 event: Option<&RoundEvent>,
1500 ) {
1501 if let Some(event) = event && state.state().ongoing_participation() {
1502 let updated = state.state_mut().process_event(self, &event).await;
1503 if updated {
1504 if let Err(e) = self.db.update_round_state(&state).await {
1505 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1506 }
1507 }
1508 }
1509
1510 match state.state_mut().sync(self).await {
1511 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1512 Ok(s) if s.is_final() => {
1513 info!("Round #{} finished with result: {:?}", state.id(), s);
1514 if let Err(e) = self.db.remove_round_state(&state).await {
1515 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1516 }
1517 },
1518 Ok(s) => {
1519 trace!("Round state #{} is now in state {:?}", state.id(), s);
1520 if let Err(e) = self.db.update_round_state(&state).await {
1521 warn!("Error storing round state #{}: {:#}", state.id(), e);
1522 }
1523 },
1524 }
1525 }
1526
1527 pub async fn progress_pending_rounds(
1532 &self,
1533 last_round_event: Option<&RoundEvent>,
1534 ) -> anyhow::Result<()> {
1535 let states = self.pending_round_states().await?;
1536 if states.is_empty() {
1537 return Ok(());
1538 }
1539
1540 info!("Processing {} rounds...", states.len());
1541
1542 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1543
1544 let has_ongoing_participation = states.iter()
1545 .any(|s| s.state().ongoing_participation());
1546 if has_ongoing_participation && last_round_event.is_none() {
1547 match self.get_last_round_event().await {
1548 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1549 Err(e) => {
1550 warn!("Error fetching round event, \
1551 failed to progress ongoing rounds: {:#}", e);
1552 },
1553 }
1554 }
1555
1556 let event = last_round_event.as_ref().map(|c| c.as_ref());
1557
1558 let futs = states.into_iter().map(async |state| {
1559 let locked = self.lock_wait_round_state(state.id()).await?;
1560 if let Some(mut locked) = locked {
1561 self.inner_process_event(&mut locked, event).await;
1562 }
1563 Ok::<_, anyhow::Error>(())
1564 });
1565
1566 futures::future::join_all(futs).await;
1567
1568 Ok(())
1569 }
1570
1571 pub async fn subscribe_round_events(&self)
1572 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1573 {
1574 let (mut srv, _) = self.require_server().await?;
1575 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1576 .into_inner().map(|m| {
1577 let m = m.context("received error on event stream")?;
1578 let e = RoundEvent::try_from(m.clone())
1579 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1580 trace!("Received round event: {}", e);
1581 Ok::<_, anyhow::Error>(e)
1582 });
1583 Ok(events)
1584 }
1585
1586 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1591 let mut events = self.subscribe_round_events().await?;
1592
1593 loop {
1594 let state_ids = self.pending_round_states().await?.iter()
1597 .filter(|s| s.state().ongoing_participation())
1598 .map(|s| s.id())
1599 .collect::<Vec<_>>();
1600
1601 if state_ids.is_empty() {
1602 info!("All rounds handled");
1603 return Ok(());
1604 }
1605
1606 let event = events.next().await
1607 .context("events stream broke")?
1608 .context("error on event stream")?;
1609
1610 let futs = state_ids.into_iter().map(async |state| {
1611 let locked = self.lock_wait_round_state(state).await?;
1612 if let Some(mut locked) = locked {
1613 self.inner_process_event(&mut locked, Some(&event)).await;
1614 }
1615 Ok::<_, anyhow::Error>(())
1616 });
1617
1618 futures::future::join_all(futs).await;
1619 }
1620 }
1621
1622 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1627 let state_ids = self.db.get_pending_round_state_ids().await?;
1629
1630 let futures = state_ids.into_iter().map(|state_id| {
1631 async move {
1632 let mut state = match self.lock_wait_round_state(state_id).await {
1634 Ok(Some(s)) => s,
1635 Ok(None) => return,
1636 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1637 };
1638
1639 match state.state_mut().try_cancel(self).await {
1640 Ok(true) => {
1641 if let Err(e) = self.db.remove_round_state(&state).await {
1642 warn!("Error removing canceled round state from db: {:#}", e);
1643 }
1644 },
1645 Ok(false) => {},
1646 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1647 }
1648 }
1649 });
1650
1651 join_all(futures).await;
1652
1653 Ok(())
1654 }
1655
1656 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1658 let mut state = self.lock_wait_round_state(id).await?
1659 .context("round state not found")?;
1660
1661 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1662 self.db.remove_round_state(&state).await
1663 .context("error removing canceled round state from db")?;
1664 } else {
1665 bail!("failed to cancel round");
1666 }
1667
1668 Ok(())
1669 }
1670
1671 pub(crate) async fn participate_round(
1678 &self,
1679 participation: RoundParticipation,
1680 movement_kind: Option<RoundMovement>,
1681 ) -> anyhow::Result<RoundStatus> {
1682 let mut state = self.join_next_round(participation, movement_kind).await?;
1683
1684 info!("Waiting for a round start...");
1685 let mut events = self.subscribe_round_events().await?;
1686
1687 loop {
1688 if !state.state().ongoing_participation() {
1689 let status = state.state_mut().sync(self).await?;
1690 match status {
1691 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1692 RoundStatus::Canceled => bail!("round canceled"),
1693 status => return Ok(status),
1694 }
1695 }
1696
1697 let event = events.next().await
1698 .context("events stream broke")?
1699 .context("error on event stream")?;
1700 if state.state_mut().process_event(self, &event).await {
1701 self.db.update_round_state(&state).await?;
1702 }
1703 }
1704 }
1705}