1mod lock;
6
7pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
8
9use std::iter;
10use std::borrow::Cow;
11use std::convert::Infallible;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use anyhow::Context;
15use ark::vtxo::VtxoValidationError;
16use bdk_esplora::esplora_client::Amount;
17use bip39::rand;
18use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
19use bitcoin::consensus::encode::{deserialize, serialize_hex};
20use bitcoin::hashes::Hash;
21use bitcoin::hex::DisplayHex;
22use bitcoin::key::Keypair;
23use bitcoin::secp256k1::schnorr;
24use futures::future::{join_all, try_join_all};
25use futures::{Stream, StreamExt};
26use log::{debug, error, info, trace, warn};
27
28use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
29use ark::vtxo::Full;
30use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
31use ark::mailbox::MailboxIdentifier;
32use ark::forfeit::HashLockedForfeitBundle;
33use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
34use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
35use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
36use bitcoin_ext::TxStatus;
37use server_rpc::{protos, ServerConnection, TryFromBytes};
38
39use crate::{SECP, Wallet, WalletVtxo};
40use crate::movement::{MovementId, MovementStatus};
41use crate::movement::update::MovementUpdate;
42use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
43use crate::subsystem::{RoundMovement, Subsystem};
44
45
46const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
51pub struct RoundParticipation {
52 #[serde(with = "ark::encode::serde::vec")]
53 pub inputs: Vec<Vtxo<Full>>,
54 pub outputs: Vec<VtxoRequest>,
57 #[serde(default, skip_serializing_if = "Option::is_none")]
59 pub unblinded_mailbox_id: Option<MailboxIdentifier>,
60}
61
62impl RoundParticipation {
63 pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
64 let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
65 let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
66 let fee = input_amount - output_amount;
67 Ok(MovementUpdate::new()
68 .consumed_vtxos(&self.inputs)
69 .intended_balance(SignedAmount::ZERO)
70 .effective_balance( - fee.to_signed()?)
71 .fee(fee)
72 )
73 }
74}
75
76#[derive(Debug, Clone)]
77pub enum RoundStatus {
78 Confirmed {
80 funding_txid: Txid,
81 },
82 Unconfirmed {
84 funding_txid: Txid,
85 },
86 Pending,
88 Failed {
90 error: String,
91 },
92 Canceled,
94}
95
96impl RoundStatus {
97 pub fn is_final(&self) -> bool {
99 match self {
100 Self::Confirmed { .. } => true,
101 Self::Unconfirmed { .. } => false,
102 Self::Pending => false,
103 Self::Failed { .. } => true,
104 Self::Canceled => true,
105 }
106 }
107
108 pub fn is_success(&self) -> bool {
110 match self {
111 Self::Confirmed { .. } => true,
112 Self::Unconfirmed { .. } => true,
113 Self::Pending => false,
114 Self::Failed { .. } => false,
115 Self::Canceled => false,
116 }
117 }
118}
119
120pub struct RoundState {
133 pub(crate) done: bool,
135
136 pub(crate) participation: RoundParticipation,
138
139 pub(crate) flow: RoundFlowState,
141
142 pub(crate) new_vtxos: Vec<Vtxo<Full>>,
147
148 pub(crate) sent_forfeit_sigs: bool,
155
156 pub(crate) movement_id: Option<MovementId>,
158}
159
160impl RoundState {
161 fn new_interactive(
162 participation: RoundParticipation,
163 movement_id: Option<MovementId>,
164 ) -> Self {
165 Self {
166 participation,
167 movement_id,
168 flow: RoundFlowState::InteractivePending,
169 new_vtxos: Vec::new(),
170 sent_forfeit_sigs: false,
171 done: false,
172 }
173 }
174
175 fn new_non_interactive(
176 participation: RoundParticipation,
177 unlock_hash: UnlockHash,
178 movement_id: Option<MovementId>,
179 ) -> Self {
180 Self {
181 participation,
182 movement_id,
183 flow: RoundFlowState::NonInteractivePending { unlock_hash },
184 new_vtxos: Vec::new(),
185 sent_forfeit_sigs: false,
186 done: false,
187 }
188 }
189
190 pub fn participation(&self) -> &RoundParticipation {
192 &self.participation
193 }
194
195 pub fn unlock_hash(&self) -> Option<UnlockHash> {
197 match self.flow {
198 RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
199 RoundFlowState::InteractivePending => None,
200 RoundFlowState::InteractiveOngoing { .. } => None,
201 RoundFlowState::Failed { .. } => None,
202 RoundFlowState::Canceled => None,
203 RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
204 }
205 }
206
207 pub fn funding_tx(&self) -> Option<&Transaction> {
208 match self.flow {
209 RoundFlowState::NonInteractivePending { .. } => None,
210 RoundFlowState::InteractivePending => None,
211 RoundFlowState::InteractiveOngoing { .. } => None,
212 RoundFlowState::Failed { .. } => None,
213 RoundFlowState::Canceled => None,
214 RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
215 }
216 }
217
218 pub fn ongoing_participation(&self) -> bool {
220 match self.flow {
221 RoundFlowState::NonInteractivePending { .. } => false,
222 RoundFlowState::InteractivePending => true,
223 RoundFlowState::InteractiveOngoing { .. } => true,
224 RoundFlowState::Failed { .. } => false,
225 RoundFlowState::Canceled => false,
226 RoundFlowState::Finished { .. } => false,
227 }
228 }
229
230 pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
233 let ret = match self.flow {
234 RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
235 RoundFlowState::Canceled => true,
236 RoundFlowState::Failed { .. } => true,
237 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
238 self.flow = RoundFlowState::Canceled;
239 true
240 },
241 RoundFlowState::Finished { .. } => false,
242 };
243 if ret {
244 persist_round_failure(wallet, &self.participation, self.movement_id).await
245 .context("failed to persist round failure for cancelation")?;
246 }
247 Ok(ret)
248 }
249
250 async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
251 match start_attempt(wallet, &self.participation, attempt).await {
252 Ok(state) => {
253 self.flow = RoundFlowState::InteractiveOngoing {
254 round_seq: attempt.round_seq,
255 attempt_seq: attempt.attempt_seq,
256 state: state,
257 };
258 },
259 Err(e) => {
260 self.flow = RoundFlowState::Failed {
261 error: format!("{:#}", e),
262 };
263 },
264 }
265 }
266
267 pub async fn process_event(
269 &mut self,
270 wallet: &Wallet,
271 event: &RoundEvent,
272 ) -> bool {
273 let _: Infallible = match self.flow {
274 RoundFlowState::InteractivePending => {
275 if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
276 trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
277 self.try_start_attempt(wallet, e).await;
278 return true;
279 } else {
280 trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
281 event.kind(), event.round_seq(), event.attempt_seq(),
282 );
283 return false;
284 }
285 },
286 RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
287 if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
290 warn!("Round {} failed by server", round_seq);
291 self.flow = RoundFlowState::Failed {
292 error: format!("round {} failed by server", round_seq),
293 };
294 return true;
295 }
296
297 if event.round_seq() > round_seq {
298 self.flow = RoundFlowState::Failed {
301 error: format!("round {} started while we were on {}",
302 event.round_seq(), round_seq,
303 ),
304 };
305 return true;
306 }
307
308 if event.attempt_seq() < attempt_seq {
309 trace!("ignoring replayed message from old attempt");
310 return false;
311 }
312
313 if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
314 trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
315 self.try_start_attempt(wallet, e).await;
316 return true;
317 }
318 trace!("Processing event {} for round attempt {}:{} in state {}",
319 event.kind(), round_seq, attempt_seq, state.kind(),
320 );
321
322 return match progress_attempt(state, wallet, &self.participation, event).await {
323 AttemptProgressResult::NotUpdated => false,
324 AttemptProgressResult::Updated { new_state } => {
325 *state = new_state;
326 true
327 },
328 AttemptProgressResult::Failed(e) => {
329 warn!("Round failed with error: {:#}", e);
330 self.flow = RoundFlowState::Failed {
331 error: format!("{:#}", e),
332 };
333 true
334 },
335 AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
336 self.new_vtxos = vtxos;
337 let funding_txid = funding_tx.compute_txid();
338 self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
339 if let Some(mid) = self.movement_id {
340 if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
341 warn!("Error updating the round funding txid: {:#}", e);
342 }
343 }
344 true
345 },
346 };
347 },
348 RoundFlowState::NonInteractivePending { .. }
349 | RoundFlowState::Finished { .. }
350 | RoundFlowState::Failed { .. }
351 | RoundFlowState::Canceled => return false,
352 };
353 }
354
355 pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
360 match self.flow {
361 RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
362 Ok(RoundStatus::Confirmed {
363 funding_txid: funding_tx.compute_txid(),
364 })
365 },
366
367 RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
368 Ok(RoundStatus::Pending)
369 },
370 RoundFlowState::Failed { ref error } => {
371 persist_round_failure(wallet, &self.participation, self.movement_id).await
372 .context("failed to persist round failure")?;
373 Ok(RoundStatus::Failed { error: error.clone() })
374 },
375 RoundFlowState::Canceled => {
376 persist_round_failure(wallet, &self.participation, self.movement_id).await
377 .context("failed to persist round failure")?;
378 Ok(RoundStatus::Canceled)
379 },
380
381 RoundFlowState::NonInteractivePending { unlock_hash } => {
382 match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
383 Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
384 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
385 let funding_txid = funding_tx.compute_txid();
386 self.new_vtxos = new_vtxos;
387 self.flow = RoundFlowState::Finished {
388 funding_tx: funding_tx.clone(),
389 unlock_hash: unlock_hash,
390 };
391
392 persist_round_success(
393 wallet,
394 &self.participation,
395 self.movement_id,
396 &self.new_vtxos,
397 &funding_tx,
398 ).await.context("failed to store successful round in DB!")?;
399
400 self.done = true;
401
402 Ok(RoundStatus::Confirmed { funding_txid })
403 },
404 Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
405 if let Some(mid) = self.movement_id {
406 update_funding_txid(wallet, mid, funding_txid).await
407 .context("failed to update funding txid in DB")?;
408 }
409 Ok(RoundStatus::Unconfirmed { funding_txid })
410 },
411
412 Err(HarkForfeitError::Err(e)) => {
415 Err(e.context("error progressing non-interactive round"))
419 },
420 Err(HarkForfeitError::SentForfeits(e)) => {
421 self.sent_forfeit_sigs = true;
422 Err(e.context("error progressing non-interactive round \
423 after sending forfeit tx signatures"))
424 },
425 }
426 },
427 RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
429 let funding_txid = funding_tx.compute_txid();
430 let confirmed = check_funding_tx_confirmations(
431 wallet, funding_txid, &funding_tx,
432 ).await.context("error checking funding tx confirmations")?;
433 if !confirmed {
434 trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
435 return Ok(RoundStatus::Unconfirmed { funding_txid });
436 }
437
438 match hark_vtxo_swap(
439 wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
440 ).await {
441 Ok(()) => {
442 persist_round_success(
443 wallet,
444 &self.participation,
445 self.movement_id,
446 &self.new_vtxos,
447 &funding_tx,
448 ).await.context("failed to store successful round in DB!")?;
449
450 self.done = true;
451
452 Ok(RoundStatus::Confirmed { funding_txid })
453 },
454 Err(HarkForfeitError::Err(e)) => {
455 Err(e.context("error forfeiting VTXOs after round"))
456 },
457 Err(HarkForfeitError::SentForfeits(e)) => {
458 self.sent_forfeit_sigs = true;
459 Err(e.context("error after having signed and sent \
460 forfeit signatures to server"))
461 },
462 }
463 },
464 }
465 }
466
467 pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
470 if self.new_vtxos.is_empty() {
471 None
472 } else {
473 Some(&self.new_vtxos)
474 }
475 }
476
477 pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
480 match self.flow {
482 RoundFlowState::NonInteractivePending { .. }
483 | RoundFlowState::InteractivePending
484 | RoundFlowState::InteractiveOngoing { .. }
485 => {
486 &self.participation.inputs
487 },
488 RoundFlowState::Finished { .. } => if self.done {
489 &[]
491 } else {
492 &self.participation.inputs
493 },
494 RoundFlowState::Failed { .. }
495 | RoundFlowState::Canceled
496 => {
497 &[]
499 },
500 }
501 }
502
503 pub fn pending_balance(&self) -> Amount {
507 if self.done {
508 return Amount::ZERO;
509 }
510
511 match self.flow {
512 RoundFlowState::NonInteractivePending { .. }
513 | RoundFlowState::InteractivePending
514 | RoundFlowState::InteractiveOngoing { .. }
515 | RoundFlowState::Finished { .. }
516 => {
517 self.participation.outputs.iter().map(|o| o.amount).sum()
518 },
519 RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
520 Amount::ZERO
521 },
522 }
523 }
524}
525
526pub enum RoundFlowState {
531 NonInteractivePending {
533 unlock_hash: UnlockHash,
534 },
535
536 InteractivePending,
538 InteractiveOngoing {
540 round_seq: RoundSeq,
541 attempt_seq: usize,
542 state: AttemptState,
543 },
544
545 Finished {
547 funding_tx: Transaction,
548 unlock_hash: UnlockHash,
549 },
550
551 Failed {
553 error: String,
554 },
555
556 Canceled,
558}
559
560pub enum AttemptState {
565 AwaitingAttempt,
566 AwaitingUnsignedVtxoTree {
567 cosign_keys: Vec<Keypair>,
568 secret_nonces: Vec<Vec<DangerousSecretNonce>>,
569 unlock_hash: UnlockHash,
570 },
571 AwaitingFinishedRound {
572 unsigned_round_tx: Transaction,
573 vtxos_spec: VtxoTreeSpec,
574 unlock_hash: UnlockHash,
575 },
576}
577
578impl AttemptState {
579 fn kind(&self) -> &'static str {
581 match self {
582 Self::AwaitingAttempt => "AwaitingAttempt",
583 Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
584 Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
585 }
586 }
587}
588
589enum AttemptProgressResult {
591 Finished {
592 funding_tx: Transaction,
593 vtxos: Vec<Vtxo<Full>>,
594 unlock_hash: UnlockHash,
595 },
596 Failed(anyhow::Error),
597 Updated {
603 new_state: AttemptState,
604 },
605 NotUpdated,
606}
607
608async fn start_attempt(
610 wallet: &Wallet,
611 participation: &RoundParticipation,
612 event: &RoundAttempt,
613) -> anyhow::Result<AttemptState> {
614 let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
615
616 let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
618 .take(participation.outputs.len())
619 .collect::<Vec<_>>();
620
621 let cosign_nonces = cosign_keys.iter()
624 .map(|key| {
625 let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
626 let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
627 for _ in 0..ark_info.nb_round_nonces {
628 let (s, p) = musig::nonce_pair(key);
629 secs.push(s);
630 pubs.push(p);
631 }
632 (secs, pubs)
633 })
634 .take(participation.outputs.len())
635 .collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
636
637
638 debug!("Submitting payment request with {} inputs and {} vtxo outputs",
640 participation.inputs.len(), participation.outputs.len(),
641 );
642
643 let unblinded_mailbox_id = wallet.mailbox_identifier();
645 let signed_reqs = participation.outputs.iter()
646 .zip(cosign_keys.iter())
647 .zip(cosign_nonces.iter())
648 .map(|((req, cosign_key), (_sec, pub_nonces))| {
649 SignedVtxoRequest {
650 vtxo: req.clone(),
651 cosign_pubkey: cosign_key.public_key(),
652 nonces: pub_nonces.clone(),
653 }
654 })
655 .collect::<Vec<_>>();
656
657 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
658 for vtxo in participation.inputs.iter() {
659 let keypair = wallet.get_vtxo_key(vtxo).await
660 .map_err(HarkForfeitError::Err)?;
661 input_vtxos.push(protos::InputVtxo {
662 vtxo_id: vtxo.id().to_bytes().to_vec(),
663 attestation: {
664 let attestation = RoundAttemptAttestation::new(
665 event.challenge, vtxo.id(), &signed_reqs, &keypair,
666 );
667 attestation.serialize()
668 },
669 });
670 }
671
672 wallet.register_vtxos_with_server(&participation.inputs).await
674 .map_err(HarkForfeitError::Err)?;
675
676 let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
677 input_vtxos: input_vtxos,
678 vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
679 #[allow(deprecated)]
680 offboard_requests: vec![],
681 unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
682 }).await.context("Ark server refused our payment submission")?;
683
684 Ok(AttemptState::AwaitingUnsignedVtxoTree {
685 unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
686 cosign_keys: cosign_keys,
687 secret_nonces: cosign_nonces.into_iter()
688 .map(|(sec, _pub)| sec.into_iter()
689 .map(DangerousSecretNonce::dangerous_from_secret_nonce)
690 .collect())
691 .collect(),
692 })
693}
694
695#[derive(Debug, thiserror::Error)]
697enum HarkForfeitError {
698 #[error("error after forfeits were sent")]
700 SentForfeits(#[source] anyhow::Error),
701 #[error("error before forfeits were sent")]
703 Err(#[source] anyhow::Error),
704}
705
706async fn hark_cosign_leaf(
707 wallet: &Wallet,
708 srv: &mut ServerConnection,
709 funding_tx: &Transaction,
710 vtxo: &mut Vtxo<Full>,
711) -> anyhow::Result<()> {
712 let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
713 .context("error fetching keypair").map_err(HarkForfeitError::Err)?
714 .with_context(|| format!(
715 "keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
716 ))?.1;
717 let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
718 let cosign_resp = srv.client.request_leaf_vtxo_cosign(
719 protos::LeafVtxoCosignRequest::from(cosign_req),
720 ).await
721 .with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
722 .into_inner().try_into()
723 .context("bad leaf vtxo cosign response")?;
724 ensure!(ctx.finalize(vtxo, cosign_resp),
725 "failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
726 );
727
728 Ok(())
729}
730
731async fn hark_vtxo_swap(
741 wallet: &Wallet,
742 participation: &RoundParticipation,
743 output_vtxos: &mut [Vtxo<Full>],
744 funding_tx: &Transaction,
745 unlock_hash: UnlockHash,
746) -> Result<(), HarkForfeitError> {
747 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
748
749 wallet.register_vtxos_with_server(&participation.inputs).await
751 .context("couldn't send our input vtxos to server")
752 .map_err(HarkForfeitError::Err)?;
753
754 for vtxo in output_vtxos.iter_mut() {
756 hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
757 .map_err(HarkForfeitError::Err)?;
758 }
759
760 let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
763 unlock_hash: unlock_hash.to_byte_array().to_vec(),
764 vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
765 }).await
766 .context("request forfeits nonces call failed")
767 .map_err(HarkForfeitError::Err)?
768 .into_inner().public_nonces.into_iter()
769 .map(|b| musig::PublicNonce::from_bytes(b))
770 .collect::<Result<Vec<_>, _>>()
771 .context("invalid forfeit nonces")
772 .map_err(HarkForfeitError::Err)?;
773
774 if server_nonces.len() != participation.inputs.len() {
775 return Err(HarkForfeitError::Err(anyhow!(
776 "server sent {} nonce pairs, expected {}",
777 server_nonces.len(), participation.inputs.len(),
778 )));
779 }
780
781 let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
782 for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
783 let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
784 .ok().flatten().with_context(|| format!(
785 "failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
786 )).map_err(HarkForfeitError::Err)?.1;
787 forfeit_bundles.push(HashLockedForfeitBundle::new(
788 input, unlock_hash, &user_key, &nonces,
789 ))
790 }
791
792 let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
793 forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
794 }).await
795 .context("forfeit vtxos call failed")
796 .map_err(HarkForfeitError::SentForfeits)?
797 .into_inner().unlock_preimage.as_slice().try_into()
798 .context("invalid preimage length")
799 .map_err(HarkForfeitError::SentForfeits)?;
800
801 for vtxo in output_vtxos.iter_mut() {
802 if !vtxo.provide_unlock_preimage(preimage) {
803 return Err(HarkForfeitError::SentForfeits(anyhow!(
804 "invalid preimage {} for vtxo {} with supposed unlock hash {}",
805 preimage.as_hex(), vtxo.id(), unlock_hash,
806 )));
807 }
808
809 vtxo.validate(&funding_tx).with_context(|| format!(
811 "new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
812 )).map_err(HarkForfeitError::SentForfeits)?;
813 }
814
815 Ok(())
816}
817
818fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
819 match vtxo.validate(funding_tx) {
820 Err(VtxoValidationError::GenesisTransition {
821 genesis_idx, genesis_len, transition_kind, ..
822 }) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
823 Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
824 vtxo.serialize_hex(),
825 )),
826 Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
827 }
828}
829
830fn check_round_matches_participation(
831 part: &RoundParticipation,
832 new_vtxos: &[Vtxo<Full>],
833 funding_tx: &Transaction,
834) -> anyhow::Result<()> {
835 ensure!(new_vtxos.len() == part.outputs.len(),
836 "unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
837 );
838
839 for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
840 ensure!(vtxo.amount() == req.amount,
841 "unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
842 );
843 ensure!(*vtxo.policy() == req.policy,
844 "unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
845 );
846
847 check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
849 }
850
851 Ok(())
852}
853
854async fn check_funding_tx_confirmations(
864 wallet: &Wallet,
865 funding_txid: Txid,
866 funding_tx: &Transaction,
867) -> anyhow::Result<bool> {
868 let tip = wallet.chain.tip().await.context("chain source error")?;
869 let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
870 let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
871 trace!("Round funding tx {} confirmation status: {:?} (tip={})",
872 funding_txid, tx_status, tip,
873 );
874 match tx_status {
875 TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
876 TxStatus::Mempool | TxStatus::Confirmed(_) => {
877 if wallet.config.round_tx_required_confirmations == 0 {
878 debug!("Accepting round funding tx without confirmations because of configuration");
879 Ok(true)
880 } else {
881 trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
882 Ok(false)
883 }
884 },
885 TxStatus::NotFound => {
886 if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
891 Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
892 funding_txid, serialize_hex(funding_tx), e,
893 ))
894 } else {
895 trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
896 Ok(false)
897 }
898 },
899 }
900}
901
902enum HarkProgressResult {
903 RoundPending,
904 FundingTxUnconfirmed {
905 funding_txid: Txid,
906 },
907 Ok {
908 funding_tx: Transaction,
909 new_vtxos: Vec<Vtxo<Full>>,
910 },
911}
912
913async fn progress_non_interactive(
914 wallet: &Wallet,
915 participation: &RoundParticipation,
916 unlock_hash: UnlockHash,
917) -> Result<HarkProgressResult, HarkForfeitError> {
918 let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
919
920 let resp = srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
921 unlock_hash: unlock_hash.to_byte_array().to_vec(),
922 }).await
923 .context("error checking round participation status")
924 .map_err(HarkForfeitError::Err)?.into_inner();
925 let status = protos::RoundParticipationStatus::try_from(resp.status)
926 .context("unknown status from server")
927 .map_err(HarkForfeitError::Err) ?;
928
929 if status == protos::RoundParticipationStatus::RoundPartPending {
930 trace!("Hark round still pending");
931 return Ok(HarkProgressResult::RoundPending);
932 }
933
934 if status == protos::RoundParticipationStatus::RoundPartReleased {
939 let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
940 warn!("Server says preimage was already released for hArk participation \
941 with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
942 );
943 }
944
945 let funding_tx_bytes = resp.round_funding_tx
946 .context("funding txid should be provided when status is not pending")
947 .map_err(HarkForfeitError::Err)?;
948 let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
949 .context("invalid funding txid")
950 .map_err(HarkForfeitError::Err)?;
951 let funding_txid = funding_tx.compute_txid();
952 trace!("Funding tx for round participation with unlock hash {}: {} ({})",
953 unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
954 );
955
956 match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
958 Ok(true) => {},
959 Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
960 Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
961 }
962
963 let mut new_vtxos = resp.output_vtxos.into_iter()
964 .map(|v| <Vtxo<Full>>::deserialize(&v))
965 .collect::<Result<Vec<_>, _>>()
966 .context("invalid output VTXOs from server")
967 .map_err(HarkForfeitError::Err)?;
968
969 check_round_matches_participation(participation, &new_vtxos, &funding_tx)
971 .context("new VTXOs received from server don't match our participation")
972 .map_err(HarkForfeitError::Err)?;
973
974 hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
975 .context("error forfeiting hArk VTXOs")
976 .map_err(HarkForfeitError::SentForfeits)?;
977
978 Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
979}
980
981async fn progress_attempt(
982 state: &AttemptState,
983 wallet: &Wallet,
984 part: &RoundParticipation,
985 event: &RoundEvent,
986) -> AttemptProgressResult {
987 match (state, event) {
991
992 (
993 AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
994 RoundEvent::VtxoProposal(e),
995 ) => {
996 trace!("Received VtxoProposal: {:#?}", e);
997 match sign_vtxo_tree(
998 wallet,
999 part,
1000 &cosign_keys,
1001 &secret_nonces,
1002 &e.unsigned_round_tx,
1003 &e.vtxos_spec,
1004 &e.cosign_agg_nonces,
1005 ).await {
1006 Ok(()) => {
1007 AttemptProgressResult::Updated {
1008 new_state: AttemptState::AwaitingFinishedRound {
1009 unsigned_round_tx: e.unsigned_round_tx.clone(),
1010 vtxos_spec: e.vtxos_spec.clone(),
1011 unlock_hash: *unlock_hash,
1012 },
1013 }
1014 },
1015 Err(e) => {
1016 trace!("Error signing VTXO tree: {:#}", e);
1017 AttemptProgressResult::Failed(e)
1018 },
1019 }
1020 },
1021
1022 (
1023 AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
1024 RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
1025 ) => {
1026 if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
1027 return AttemptProgressResult::Failed(anyhow!(
1028 "signed funding tx ({}) doesn't match tx received before ({})",
1029 signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
1030 ));
1031 }
1032
1033 if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
1034 warn!("Failed to broadcast signed round tx: {:#}", e);
1035 }
1036
1037 match construct_new_vtxos(
1038 part, unsigned_round_tx, vtxos_spec, cosign_sigs,
1039 ).await {
1040 Ok(v) => AttemptProgressResult::Finished {
1041 funding_tx: signed_round_tx.clone(),
1042 vtxos: v,
1043 unlock_hash: *unlock_hash,
1044 },
1045 Err(e) => AttemptProgressResult::Failed(anyhow!(
1046 "failed to construct new VTXOs for round: {:#}", e,
1047 )),
1048 }
1049 },
1050
1051 (state, RoundEvent::Finished(RoundFinished { .. })) => {
1052 AttemptProgressResult::Failed(anyhow!(
1053 "unexpectedly received a finished round while we were in state {}",
1054 state.kind(),
1055 ))
1056 },
1057
1058 (state, _) => {
1059 trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
1060 AttemptProgressResult::NotUpdated
1061 },
1062 }
1063}
1064
1065async fn sign_vtxo_tree(
1066 wallet: &Wallet,
1067 participation: &RoundParticipation,
1068 cosign_keys: &[Keypair],
1069 secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
1070 unsigned_round_tx: &Transaction,
1071 vtxo_tree: &VtxoTreeSpec,
1072 cosign_agg_nonces: &[musig::AggregatedNonce],
1073) -> anyhow::Result<()> {
1074 let (srv, _) = wallet.require_server().await.context("server not available")?;
1075
1076 let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
1077
1078 let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1080 for vtxo_req in vtxo_tree.iter_vtxos() {
1081 if let Some(i) = my_vtxos.iter().position(|v| {
1082 v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
1083 }) {
1084 my_vtxos.swap_remove(i);
1085 }
1086 }
1087 if !my_vtxos.is_empty() {
1088 bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
1089 }
1090
1091 let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1092 let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
1093 trace!("Sending vtxo signatures to server...");
1094 let _ = try_join_all(iter.map(|((req, key), sec)| async {
1095 let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
1096 let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
1097 let part_sigs = unsigned_vtxos.cosign_branch(
1098 &cosign_agg_nonces, leaf_idx, key, secret_nonces,
1099 ).context("failed to cosign branch: our request not part of tree")?;
1100
1101 info!("Sending {} partial vtxo cosign signatures for pk {}",
1102 part_sigs.len(), key.public_key(),
1103 );
1104
1105 let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
1106 pubkey: key.public_key().serialize().to_vec(),
1107 signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
1108 }).await.context("error sending vtxo signatures")?;
1109
1110 Result::<(), anyhow::Error>::Ok(())
1111 })).await.context("error sending VTXO signatures")?;
1112 trace!("Done sending vtxo signatures to server");
1113
1114 Ok(())
1115}
1116
1117async fn construct_new_vtxos(
1118 participation: &RoundParticipation,
1119 unsigned_round_tx: &Transaction,
1120 vtxo_tree: &VtxoTreeSpec,
1121 vtxo_cosign_sigs: &[schnorr::Signature],
1122) -> anyhow::Result<Vec<Vtxo<Full>>> {
1123 let round_txid = unsigned_round_tx.compute_txid();
1124 let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
1125 let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
1126
1127 if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
1129 bail!("Received incorrect vtxo cosign signatures from server");
1131 }
1132
1133 let signed_vtxos = vtxo_tree
1134 .into_signed_tree(vtxo_cosign_sigs.to_vec())
1135 .into_cached_tree();
1136
1137 let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
1138 let total_nb_expected_vtxos = expected_vtxos.len();
1139
1140 let mut new_vtxos = vec![];
1141 for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
1142 if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
1143 let vtxo = signed_vtxos.build_vtxo(idx);
1144
1145 check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
1148 .context("constructed invalid vtxo from tree")?;
1149
1150 info!("New VTXO from round: {} ({}, {})",
1151 vtxo.id(), vtxo.amount(), vtxo.policy_type(),
1152 );
1153
1154 new_vtxos.push(vtxo);
1155 expected_vtxos.swap_remove(expected_idx);
1156 }
1157 }
1158
1159 if !expected_vtxos.is_empty() {
1160 if expected_vtxos.len() == total_nb_expected_vtxos {
1161 bail!("None of our VTXOs were present in round!");
1163 } else {
1164 bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
1165 expected_vtxos.len(), expected_vtxos,
1166 );
1167 }
1168 }
1169 Ok(new_vtxos)
1170}
1171
1172async fn persist_round_success(
1174 wallet: &Wallet,
1175 participation: &RoundParticipation,
1176 movement_id: Option<MovementId>,
1177 new_vtxos: &[Vtxo<Full>],
1178 funding_tx: &Transaction,
1179) -> anyhow::Result<()> {
1180 debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
1181 new_vtxos.len(), movement_id,
1182 );
1183
1184 let store_result = wallet.store_spendable_vtxos(new_vtxos).await
1188 .context("failed to store new VTXOs");
1189 let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
1190 .context("failed to mark input VTXOs as spent");
1191 let update_result = if let Some(mid) = movement_id {
1192 wallet.movements.finish_movement_with_update(
1193 mid,
1194 MovementStatus::Successful,
1195 MovementUpdate::new()
1196 .produced_vtxos(new_vtxos)
1197 .metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
1198 ).await.context("failed to mark movement as finished")
1199 } else {
1200 Ok(())
1201 };
1202
1203 store_result?;
1204 spent_result?;
1205 update_result?;
1206
1207 Ok(())
1208}
1209
1210async fn persist_round_failure(
1211 wallet: &Wallet,
1212 participation: &RoundParticipation,
1213 movement_id: Option<MovementId>,
1214) -> anyhow::Result<()> {
1215 debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
1216 let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
1217 let finish_result = if let Some(movement_id) = movement_id {
1218 wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
1219 } else {
1220 Ok(())
1221 };
1222 if let Err(e) = &finish_result {
1223 error!("Failed to mark movement as failed: {:#}", e);
1224 }
1225 match (unlock_result, finish_result) {
1226 (Ok(()), Ok(())) => Ok(()),
1227 (Err(e), _) => Err(e),
1228 (_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
1229 }
1230}
1231
1232async fn update_funding_txid(
1233 wallet: &Wallet,
1234 movement_id: MovementId,
1235 funding_txid: Txid,
1236) -> anyhow::Result<()> {
1237 wallet.movements.update_movement(
1238 movement_id,
1239 MovementUpdate::new()
1240 .metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
1241 ).await.context("Unable to update funding txid of round")
1242}
1243
1244impl Wallet {
1245 pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
1247 let (mut srv, _) = self.require_server().await?;
1248 let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
1249 Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
1250 }
1251
1252 pub async fn join_next_round(
1261 &self,
1262 participation: RoundParticipation,
1263 movement_kind: Option<RoundMovement>,
1264 ) -> anyhow::Result<StoredRoundState> {
1265 let movement_id = if let Some(kind) = movement_kind {
1266 Some(self.movements.new_movement_with_update(
1267 Subsystem::ROUND,
1268 kind.to_string(),
1269 participation.to_movement_update()?
1270 ).await?)
1271 } else {
1272 None
1273 };
1274 let state = RoundState::new_interactive(participation, movement_id);
1275
1276 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1277 let state = self.lock_wait_round_state(id).await?
1278 .context("failed to lock fresh round state")?;
1279
1280 Ok(state)
1281 }
1282
1283 pub async fn join_next_round_delegated(
1285 &self,
1286 participation: RoundParticipation,
1287 movement_kind: Option<RoundMovement>,
1288 ) -> anyhow::Result<StoredRoundState<Unlocked>> {
1289 let (mut srv, _) = self.require_server().await?;
1290
1291 let movement_id = if let Some(kind) = movement_kind {
1292 Some(self.movements.new_movement_with_update(
1293 Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
1294 ).await?)
1295 } else {
1296 None
1297 };
1298
1299 let unblinded_mailbox_id = self.mailbox_identifier();
1301
1302 let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
1304 for vtxo in participation.inputs.iter() {
1305 let keypair = self.get_vtxo_key(vtxo).await
1306 .context("failed to get vtxo keypair")?;
1307 input_vtxos.push(protos::InputVtxo {
1308 vtxo_id: vtxo.id().to_bytes().to_vec(),
1309 attestation: {
1310 let attestation = DelegatedRoundParticipationAttestation::new(
1311 vtxo.id(), &participation.outputs, &keypair,
1312 );
1313 attestation.serialize()
1314 },
1315 });
1316 }
1317
1318 let vtxo_requests = participation.outputs.iter()
1320 .map(|req|
1321 protos::VtxoRequest {
1322 policy: req.policy.serialize(),
1323 amount: req.amount.to_sat(),
1324 })
1325 .collect::<Vec<_>>();
1326
1327 let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
1329 input_vtxos,
1330 vtxo_requests,
1331 unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
1332 }).await.context("error submitting round participation to server")?.into_inner();
1333
1334 let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
1335 .context("invalid unlock hash from server")?;
1336
1337 let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
1338
1339 info!("Non-interactive round participation submitted, it will automatically execute \
1340 when you next sync your wallet after the round happened \
1341 (and has sufficient confirmations).",
1342 );
1343
1344 let id = self.db.store_round_state_lock_vtxos(&state).await?;
1345 Ok(StoredRoundState::new(id, state))
1346 }
1347
1348 pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
1350 self.db.get_pending_round_state_ids().await
1351 }
1352
1353 pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
1355 let ids = self.db.get_pending_round_state_ids().await?;
1356 let mut states = Vec::with_capacity(ids.len());
1357 for id in ids {
1358 if let Some(state) = self.db.get_round_state_by_id(id).await? {
1359 states.push(state);
1360 }
1361 }
1362 Ok(states)
1363 }
1364
1365 pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
1367 let mut ret = Amount::ZERO;
1368 for round in self.pending_round_states().await? {
1369 ret += round.state().pending_balance();
1370 }
1371 Ok(ret)
1372 }
1373
1374 pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
1379 let mut ret = Vec::new();
1380 for round in self.pending_round_states().await? {
1381 let inputs = round.state().locked_pending_inputs();
1382 ret.reserve(inputs.len());
1383 for input in inputs {
1384 let v = self.get_vtxo_by_id(input.id()).await
1385 .context("unknown round input VTXO")?;
1386 ret.push(v);
1387 }
1388 }
1389 Ok(ret)
1390 }
1391
1392 pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
1394 let states = self.pending_round_states().await?;
1395 if states.is_empty() {
1396 return Ok(());
1397 }
1398
1399 debug!("Syncing {} pending round states...", states.len());
1400
1401 tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
1402 if state.state().ongoing_participation() {
1404 return;
1405 }
1406
1407 let mut state = match self.lock_wait_round_state(state.id()).await {
1408 Ok(Some(state)) => state,
1409 Ok(None) => return,
1410 Err(e) => {
1411 warn!("Error locking round state: {:#}", e);
1412 return;
1413 },
1414 };
1415
1416 let status = state.state_mut().sync(self).await;
1417 trace!("Synced round #{}, status: {:?}", state.id(), status);
1418 match status {
1419 Ok(RoundStatus::Confirmed { funding_txid }) => {
1420 info!("Round confirmed. Funding tx {}", funding_txid);
1421 if let Err(e) = self.db.remove_round_state(&state).await {
1422 warn!("Error removing confirmed round state from db: {:#}", e);
1423 }
1424 },
1425 Ok(RoundStatus::Unconfirmed { funding_txid }) => {
1426 info!("Waiting for confirmations for round funding tx {}", funding_txid);
1427 if let Err(e) = self.db.update_round_state(&state).await {
1428 warn!("Error updating pending round state in db: {:#}", e);
1429 }
1430 },
1431 Ok(RoundStatus::Pending) => {
1432 if let Err(e) = self.db.update_round_state(&state).await {
1433 warn!("Error updating pending round state in db: {:#}", e);
1434 }
1435 },
1436 Ok(RoundStatus::Failed { error }) => {
1437 error!("Round failed: {}", error);
1438 if let Err(e) = self.db.remove_round_state(&state).await {
1439 warn!("Error removing failed round state from db: {:#}", e);
1440 }
1441 },
1442 Ok(RoundStatus::Canceled) => {
1443 error!("Round canceled");
1444 if let Err(e) = self.db.remove_round_state(&state).await {
1445 warn!("Error removing canceled round state from db: {:#}", e);
1446 }
1447 },
1448 Err(e) => warn!("Error syncing round: {:#}", e),
1449 }
1450 }).await;
1451
1452 Ok(())
1453 }
1454
1455 async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
1457 let (mut srv, _) = self.require_server().await?;
1458 let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
1459 Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
1460 }
1461
1462 async fn inner_process_event(
1463 &self,
1464 state: &mut StoredRoundState,
1465 event: Option<&RoundEvent>,
1466 ) {
1467 if let Some(event) = event && state.state().ongoing_participation() {
1468 let updated = state.state_mut().process_event(self, &event).await;
1469 if updated {
1470 if let Err(e) = self.db.update_round_state(&state).await {
1471 error!("Error storing round state #{} after progress: {:#}", state.id(), e);
1472 }
1473 }
1474 }
1475
1476 match state.state_mut().sync(self).await {
1477 Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
1478 Ok(s) if s.is_final() => {
1479 info!("Round #{} finished with result: {:?}", state.id(), s);
1480 if let Err(e) = self.db.remove_round_state(&state).await {
1481 warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
1482 }
1483 },
1484 Ok(s) => {
1485 trace!("Round state #{} is now in state {:?}", state.id(), s);
1486 if let Err(e) = self.db.update_round_state(&state).await {
1487 warn!("Error storing round state #{}: {:#}", state.id(), e);
1488 }
1489 },
1490 }
1491 }
1492
1493 pub async fn progress_pending_rounds(
1498 &self,
1499 last_round_event: Option<&RoundEvent>,
1500 ) -> anyhow::Result<()> {
1501 let states = self.pending_round_states().await?;
1502 if states.is_empty() {
1503 return Ok(());
1504 }
1505
1506 info!("Processing {} rounds...", states.len());
1507
1508 let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
1509
1510 let has_ongoing_participation = states.iter()
1511 .any(|s| s.state().ongoing_participation());
1512 if has_ongoing_participation && last_round_event.is_none() {
1513 match self.get_last_round_event().await {
1514 Ok(e) => last_round_event = Some(Cow::Owned(e)),
1515 Err(e) => {
1516 warn!("Error fetching round event, \
1517 failed to progress ongoing rounds: {:#}", e);
1518 },
1519 }
1520 }
1521
1522 let event = last_round_event.as_ref().map(|c| c.as_ref());
1523
1524 let futs = states.into_iter().map(async |state| {
1525 let locked = self.lock_wait_round_state(state.id()).await?;
1526 if let Some(mut locked) = locked {
1527 self.inner_process_event(&mut locked, event).await;
1528 }
1529 Ok::<_, anyhow::Error>(())
1530 });
1531
1532 futures::future::join_all(futs).await;
1533
1534 Ok(())
1535 }
1536
1537 pub async fn subscribe_round_events(&self)
1538 -> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
1539 {
1540 let (mut srv, _) = self.require_server().await?;
1541 let events = srv.client.subscribe_rounds(protos::Empty {}).await?
1542 .into_inner().map(|m| {
1543 let m = m.context("received error on event stream")?;
1544 let e = RoundEvent::try_from(m.clone())
1545 .with_context(|| format!("error converting rpc round event: {:?}", m))?;
1546 trace!("Received round event: {}", e);
1547 Ok::<_, anyhow::Error>(e)
1548 });
1549 Ok(events)
1550 }
1551
1552 pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
1557 let mut events = self.subscribe_round_events().await?;
1558
1559 loop {
1560 let state_ids = self.pending_round_states().await?.iter()
1563 .filter(|s| s.state().ongoing_participation())
1564 .map(|s| s.id())
1565 .collect::<Vec<_>>();
1566
1567 if state_ids.is_empty() {
1568 info!("All rounds handled");
1569 return Ok(());
1570 }
1571
1572 let event = events.next().await
1573 .context("events stream broke")?
1574 .context("error on event stream")?;
1575
1576 let futs = state_ids.into_iter().map(async |state| {
1577 let locked = self.lock_wait_round_state(state).await?;
1578 if let Some(mut locked) = locked {
1579 self.inner_process_event(&mut locked, Some(&event)).await;
1580 }
1581 Ok::<_, anyhow::Error>(())
1582 });
1583
1584 futures::future::join_all(futs).await;
1585 }
1586 }
1587
1588 pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
1593 let state_ids = self.db.get_pending_round_state_ids().await?;
1595
1596 let futures = state_ids.into_iter().map(|state_id| {
1597 async move {
1598 let mut state = match self.lock_wait_round_state(state_id).await {
1600 Ok(Some(s)) => s,
1601 Ok(None) => return,
1602 Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
1603 };
1604
1605 match state.state_mut().try_cancel(self).await {
1606 Ok(true) => {
1607 if let Err(e) = self.db.remove_round_state(&state).await {
1608 warn!("Error removing canceled round state from db: {:#}", e);
1609 }
1610 },
1611 Ok(false) => {},
1612 Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
1613 }
1614 }
1615 });
1616
1617 join_all(futures).await;
1618
1619 Ok(())
1620 }
1621
1622 pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
1624 let mut state = self.lock_wait_round_state(id).await?
1625 .context("round state not found")?;
1626
1627 if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
1628 self.db.remove_round_state(&state).await
1629 .context("error removing canceled round state from db")?;
1630 } else {
1631 bail!("failed to cancel round");
1632 }
1633
1634 Ok(())
1635 }
1636
1637 pub(crate) async fn participate_round(
1644 &self,
1645 participation: RoundParticipation,
1646 movement_kind: Option<RoundMovement>,
1647 ) -> anyhow::Result<RoundStatus> {
1648 let mut state = self.join_next_round(participation, movement_kind).await?;
1649
1650 info!("Waiting for a round start...");
1651 let mut events = self.subscribe_round_events().await?;
1652
1653 loop {
1654 if !state.state().ongoing_participation() {
1655 let status = state.state_mut().sync(self).await?;
1656 match status {
1657 RoundStatus::Failed { error } => bail!("round failed: {}", error),
1658 RoundStatus::Canceled => bail!("round canceled"),
1659 status => return Ok(status),
1660 }
1661 }
1662
1663 let event = events.next().await
1664 .context("events stream broke")?
1665 .context("error on event stream")?;
1666 if state.state_mut().process_event(self, &event).await {
1667 self.db.update_round_state(&state).await?;
1668 }
1669 }
1670 }
1671}