mod lock;
pub(crate) use lock::{RoundStateGuard, RoundStateLockIndex};
use std::iter;
use std::borrow::Cow;
use std::convert::Infallible;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::Context;
use ark::vtxo::VtxoValidationError;
use bdk_esplora::esplora_client::Amount;
use bip39::rand;
use bitcoin::{OutPoint, SignedAmount, Transaction, Txid};
use bitcoin::consensus::encode::{deserialize, serialize_hex};
use bitcoin::hashes::Hash;
use bitcoin::hex::DisplayHex;
use bitcoin::key::Keypair;
use bitcoin::secp256k1::schnorr;
use futures::future::{join_all, try_join_all};
use futures::{Stream, StreamExt};
use log::{debug, error, info, trace, warn};
use ark::{ProtocolEncoding, SignedVtxoRequest, Vtxo, VtxoRequest};
use ark::vtxo::Full;
use ark::attestations::{DelegatedRoundParticipationAttestation, RoundAttemptAttestation};
use ark::mailbox::MailboxIdentifier;
use ark::forfeit::HashLockedForfeitBundle;
use ark::musig::{self, DangerousSecretNonce, PublicNonce, SecretNonce};
use ark::rounds::{RoundAttempt, RoundEvent, RoundFinished, RoundSeq, ROUND_TX_VTXO_TREE_VOUT};
use ark::tree::signed::{LeafVtxoCosignContext, UnlockHash, VtxoTreeSpec};
use bitcoin_ext::TxStatus;
use server_rpc::{protos, ServerConnection, TryFromBytes};
use crate::{SECP, Wallet, WalletVtxo};
use crate::movement::{MovementId, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::persist::models::{RoundStateId, StoredRoundState, Unlocked};
use crate::subsystem::{RoundMovement, Subsystem};
const HARK_TRANSITION_KIND: &str = "hash-locked-cosigned";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoundParticipation {
#[serde(with = "ark::encode::serde::vec")]
pub inputs: Vec<Vtxo<Full>>,
pub outputs: Vec<VtxoRequest>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub unblinded_mailbox_id: Option<MailboxIdentifier>,
}
impl RoundParticipation {
pub fn to_movement_update(&self) -> anyhow::Result<MovementUpdate> {
let input_amount = self.inputs.iter().map(|i| i.amount()).sum::<Amount>();
let output_amount = self.outputs.iter().map(|r| r.amount).sum::<Amount>();
let fee = input_amount - output_amount;
Ok(MovementUpdate::new()
.consumed_vtxos(&self.inputs)
.intended_balance(SignedAmount::ZERO)
.effective_balance( - fee.to_signed()?)
.fee(fee)
)
}
}
#[derive(Debug, Clone)]
pub enum RoundStatus {
Confirmed {
funding_txid: Txid,
},
Unconfirmed {
funding_txid: Txid,
},
Pending,
Failed {
error: String,
},
Canceled,
}
impl RoundStatus {
pub fn is_final(&self) -> bool {
match self {
Self::Confirmed { .. } => true,
Self::Unconfirmed { .. } => false,
Self::Pending => false,
Self::Failed { .. } => true,
Self::Canceled => true,
}
}
pub fn is_success(&self) -> bool {
match self {
Self::Confirmed { .. } => true,
Self::Unconfirmed { .. } => true,
Self::Pending => false,
Self::Failed { .. } => false,
Self::Canceled => false,
}
}
}
pub struct RoundState {
pub(crate) done: bool,
pub(crate) participation: RoundParticipation,
pub(crate) flow: RoundFlowState,
pub(crate) new_vtxos: Vec<Vtxo<Full>>,
pub(crate) sent_forfeit_sigs: bool,
pub(crate) movement_id: Option<MovementId>,
}
impl RoundState {
fn new_interactive(
participation: RoundParticipation,
movement_id: Option<MovementId>,
) -> Self {
Self {
participation,
movement_id,
flow: RoundFlowState::InteractivePending,
new_vtxos: Vec::new(),
sent_forfeit_sigs: false,
done: false,
}
}
fn new_non_interactive(
participation: RoundParticipation,
unlock_hash: UnlockHash,
movement_id: Option<MovementId>,
) -> Self {
Self {
participation,
movement_id,
flow: RoundFlowState::NonInteractivePending { unlock_hash },
new_vtxos: Vec::new(),
sent_forfeit_sigs: false,
done: false,
}
}
pub fn participation(&self) -> &RoundParticipation {
&self.participation
}
pub fn unlock_hash(&self) -> Option<UnlockHash> {
match self.flow {
RoundFlowState::NonInteractivePending { unlock_hash } => Some(unlock_hash),
RoundFlowState::InteractivePending => None,
RoundFlowState::InteractiveOngoing { .. } => None,
RoundFlowState::Failed { .. } => None,
RoundFlowState::Canceled => None,
RoundFlowState::Finished { unlock_hash, .. } => Some(unlock_hash),
}
}
pub fn funding_tx(&self) -> Option<&Transaction> {
match self.flow {
RoundFlowState::NonInteractivePending { .. } => None,
RoundFlowState::InteractivePending => None,
RoundFlowState::InteractiveOngoing { .. } => None,
RoundFlowState::Failed { .. } => None,
RoundFlowState::Canceled => None,
RoundFlowState::Finished { ref funding_tx, .. } => Some(funding_tx),
}
}
pub fn ongoing_participation(&self) -> bool {
match self.flow {
RoundFlowState::NonInteractivePending { .. } => false,
RoundFlowState::InteractivePending => true,
RoundFlowState::InteractiveOngoing { .. } => true,
RoundFlowState::Failed { .. } => false,
RoundFlowState::Canceled => false,
RoundFlowState::Finished { .. } => false,
}
}
pub async fn try_cancel(&mut self, wallet: &Wallet) -> anyhow::Result<bool> {
let ret = match self.flow {
RoundFlowState::NonInteractivePending { .. } => todo!("we have to cancel with server!"),
RoundFlowState::Canceled => true,
RoundFlowState::Failed { .. } => true,
RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
self.flow = RoundFlowState::Canceled;
true
},
RoundFlowState::Finished { .. } => false,
};
if ret {
persist_round_failure(wallet, &self.participation, self.movement_id).await
.context("failed to persist round failure for cancelation")?;
}
Ok(ret)
}
async fn try_start_attempt(&mut self, wallet: &Wallet, attempt: &RoundAttempt) {
match start_attempt(wallet, &self.participation, attempt).await {
Ok(state) => {
self.flow = RoundFlowState::InteractiveOngoing {
round_seq: attempt.round_seq,
attempt_seq: attempt.attempt_seq,
state: state,
};
},
Err(e) => {
self.flow = RoundFlowState::Failed {
error: format!("{:#}", e),
};
},
}
}
pub async fn process_event(
&mut self,
wallet: &Wallet,
event: &RoundEvent,
) -> bool {
let _: Infallible = match self.flow {
RoundFlowState::InteractivePending => {
if let RoundEvent::Attempt(e) = event && e.attempt_seq == 0 {
trace!("Joining round attempt {}:{}", e.round_seq, e.attempt_seq);
self.try_start_attempt(wallet, e).await;
return true;
} else {
trace!("Ignoring {} event (seq {}:{}), waiting for round to start",
event.kind(), event.round_seq(), event.attempt_seq(),
);
return false;
}
},
RoundFlowState::InteractiveOngoing { round_seq, attempt_seq, ref mut state } => {
if let RoundEvent::Failed(e) = event && e.round_seq == round_seq {
warn!("Round {} failed by server", round_seq);
self.flow = RoundFlowState::Failed {
error: format!("round {} failed by server", round_seq),
};
return true;
}
if event.round_seq() > round_seq {
self.flow = RoundFlowState::Failed {
error: format!("round {} started while we were on {}",
event.round_seq(), round_seq,
),
};
return true;
}
if event.attempt_seq() < attempt_seq {
trace!("ignoring replayed message from old attempt");
return false;
}
if let RoundEvent::Attempt(e) = event && e.attempt_seq > attempt_seq {
trace!("Joining new round attempt {}:{}", e.round_seq, e.attempt_seq);
self.try_start_attempt(wallet, e).await;
return true;
}
trace!("Processing event {} for round attempt {}:{} in state {}",
event.kind(), round_seq, attempt_seq, state.kind(),
);
return match progress_attempt(state, wallet, &self.participation, event).await {
AttemptProgressResult::NotUpdated => false,
AttemptProgressResult::Updated { new_state } => {
*state = new_state;
true
},
AttemptProgressResult::Failed(e) => {
warn!("Round failed with error: {:#}", e);
self.flow = RoundFlowState::Failed {
error: format!("{:#}", e),
};
true
},
AttemptProgressResult::Finished { funding_tx, vtxos, unlock_hash } => {
self.new_vtxos = vtxos;
let funding_txid = funding_tx.compute_txid();
self.flow = RoundFlowState::Finished { funding_tx, unlock_hash };
if let Some(mid) = self.movement_id {
if let Err(e) = update_funding_txid(wallet, mid, funding_txid).await {
warn!("Error updating the round funding txid: {:#}", e);
}
}
true
},
};
},
RoundFlowState::NonInteractivePending { .. }
| RoundFlowState::Finished { .. }
| RoundFlowState::Failed { .. }
| RoundFlowState::Canceled => return false,
};
}
pub async fn sync(&mut self, wallet: &Wallet) -> anyhow::Result<RoundStatus> {
match self.flow {
RoundFlowState::Finished { ref funding_tx, .. } if self.done => {
Ok(RoundStatus::Confirmed {
funding_txid: funding_tx.compute_txid(),
})
},
RoundFlowState::InteractivePending | RoundFlowState::InteractiveOngoing { .. } => {
Ok(RoundStatus::Pending)
},
RoundFlowState::Failed { ref error } => {
persist_round_failure(wallet, &self.participation, self.movement_id).await
.context("failed to persist round failure")?;
Ok(RoundStatus::Failed { error: error.clone() })
},
RoundFlowState::Canceled => {
persist_round_failure(wallet, &self.participation, self.movement_id).await
.context("failed to persist round failure")?;
Ok(RoundStatus::Canceled)
},
RoundFlowState::NonInteractivePending { unlock_hash } => {
match progress_non_interactive(wallet, &self.participation, unlock_hash).await {
Ok(HarkProgressResult::RoundPending) => Ok(RoundStatus::Pending),
Ok(HarkProgressResult::RoundNotFound) => {
self.handle_round_not_found(wallet).await
},
Ok(HarkProgressResult::Ok { funding_tx, new_vtxos }) => {
let funding_txid = funding_tx.compute_txid();
self.new_vtxos = new_vtxos;
self.flow = RoundFlowState::Finished {
funding_tx: funding_tx.clone(),
unlock_hash: unlock_hash,
};
persist_round_success(
wallet,
&self.participation,
self.movement_id,
&self.new_vtxos,
&funding_tx,
).await.context("failed to store successful round in DB!")?;
self.done = true;
Ok(RoundStatus::Confirmed { funding_txid })
},
Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }) => {
if let Some(mid) = self.movement_id {
update_funding_txid(wallet, mid, funding_txid).await
.context("failed to update funding txid in DB")?;
}
Ok(RoundStatus::Unconfirmed { funding_txid })
},
Err(HarkForfeitError::Err(e)) => {
Err(e.context("error progressing non-interactive round"))
},
Err(HarkForfeitError::SentForfeits(e)) => {
self.sent_forfeit_sigs = true;
Err(e.context("error progressing non-interactive round \
after sending forfeit tx signatures"))
},
}
},
RoundFlowState::Finished { ref funding_tx, unlock_hash } => {
let funding_txid = funding_tx.compute_txid();
let confirmed = check_funding_tx_confirmations(
wallet, funding_txid, &funding_tx,
).await.context("error checking funding tx confirmations")?;
if !confirmed {
trace!("Funding tx {} not yet deeply enough confirmed", funding_txid);
return Ok(RoundStatus::Unconfirmed { funding_txid });
}
match hark_vtxo_swap(
wallet, &self.participation, &mut self.new_vtxos, &funding_tx, unlock_hash,
).await {
Ok(()) => {
persist_round_success(
wallet,
&self.participation,
self.movement_id,
&self.new_vtxos,
&funding_tx,
).await.context("failed to store successful round in DB!")?;
self.done = true;
Ok(RoundStatus::Confirmed { funding_txid })
},
Err(HarkForfeitError::Err(e)) => {
Err(e.context("error forfeiting VTXOs after round"))
},
Err(HarkForfeitError::SentForfeits(e)) => {
self.sent_forfeit_sigs = true;
Err(e.context("error after having signed and sent \
forfeit signatures to server"))
},
}
},
}
}
pub fn output_vtxos(&self) -> Option<&[Vtxo<Full>]> {
if self.new_vtxos.is_empty() {
None
} else {
Some(&self.new_vtxos)
}
}
pub fn locked_pending_inputs(&self) -> &[Vtxo<Full>] {
match self.flow {
RoundFlowState::NonInteractivePending { .. }
| RoundFlowState::InteractivePending
| RoundFlowState::InteractiveOngoing { .. }
=> {
&self.participation.inputs
},
RoundFlowState::Finished { .. } => if self.done {
&[]
} else {
&self.participation.inputs
},
RoundFlowState::Failed { .. }
| RoundFlowState::Canceled
=> {
&[]
},
}
}
pub fn pending_balance(&self) -> Amount {
if self.done {
return Amount::ZERO;
}
match self.flow {
RoundFlowState::NonInteractivePending { .. }
| RoundFlowState::InteractivePending
| RoundFlowState::InteractiveOngoing { .. }
| RoundFlowState::Finished { .. }
=> {
self.participation.outputs.iter().map(|o| o.amount).sum()
},
RoundFlowState::Failed { .. } | RoundFlowState::Canceled => {
Amount::ZERO
},
}
}
async fn handle_round_not_found(
&mut self,
wallet: &Wallet,
) -> anyhow::Result<RoundStatus> {
info!("Server reports round participation not found (no forfeits sent)");
self.flow = RoundFlowState::Failed {
error: "server reports round participation not found".into(),
};
persist_round_failure(wallet, &self.participation, self.movement_id).await
.context("failed to persist round failure")?;
Ok(RoundStatus::Failed {
error: "server reports round participation not found".into(),
})
}
}
pub enum RoundFlowState {
NonInteractivePending {
unlock_hash: UnlockHash,
},
InteractivePending,
InteractiveOngoing {
round_seq: RoundSeq,
attempt_seq: usize,
state: AttemptState,
},
Finished {
funding_tx: Transaction,
unlock_hash: UnlockHash,
},
Failed {
error: String,
},
Canceled,
}
pub enum AttemptState {
AwaitingAttempt,
AwaitingUnsignedVtxoTree {
cosign_keys: Vec<Keypair>,
secret_nonces: Vec<Vec<DangerousSecretNonce>>,
unlock_hash: UnlockHash,
},
AwaitingFinishedRound {
unsigned_round_tx: Transaction,
vtxos_spec: VtxoTreeSpec,
unlock_hash: UnlockHash,
},
}
impl AttemptState {
fn kind(&self) -> &'static str {
match self {
Self::AwaitingAttempt => "AwaitingAttempt",
Self::AwaitingUnsignedVtxoTree { .. } => "AwaitingUnsignedVtxoTree",
Self::AwaitingFinishedRound { .. } => "AwaitingFinishedRound",
}
}
}
enum AttemptProgressResult {
Finished {
funding_tx: Transaction,
vtxos: Vec<Vtxo<Full>>,
unlock_hash: UnlockHash,
},
Failed(anyhow::Error),
Updated {
new_state: AttemptState,
},
NotUpdated,
}
async fn start_attempt(
wallet: &Wallet,
participation: &RoundParticipation,
event: &RoundAttempt,
) -> anyhow::Result<AttemptState> {
let (mut srv, ark_info) = wallet.require_server().await.context("server not available")?;
let cosign_keys = iter::repeat_with(|| Keypair::new(&SECP, &mut rand::thread_rng()))
.take(participation.outputs.len())
.collect::<Vec<_>>();
let cosign_nonces = cosign_keys.iter()
.map(|key| {
let mut secs = Vec::with_capacity(ark_info.nb_round_nonces);
let mut pubs = Vec::with_capacity(ark_info.nb_round_nonces);
for _ in 0..ark_info.nb_round_nonces {
let (s, p) = musig::nonce_pair(key);
secs.push(s);
pubs.push(p);
}
(secs, pubs)
})
.take(participation.outputs.len())
.collect::<Vec<(Vec<SecretNonce>, Vec<PublicNonce>)>>();
debug!("Submitting payment request with {} inputs and {} vtxo outputs",
participation.inputs.len(), participation.outputs.len(),
);
let unblinded_mailbox_id = wallet.mailbox_identifier();
let signed_reqs = participation.outputs.iter()
.zip(cosign_keys.iter())
.zip(cosign_nonces.iter())
.map(|((req, cosign_key), (_sec, pub_nonces))| {
SignedVtxoRequest {
vtxo: req.clone(),
cosign_pubkey: cosign_key.public_key(),
nonces: pub_nonces.clone(),
}
})
.collect::<Vec<_>>();
let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
for vtxo in participation.inputs.iter() {
let keypair = wallet.get_vtxo_key(vtxo).await
.map_err(HarkForfeitError::Err)?;
input_vtxos.push(protos::InputVtxo {
vtxo_id: vtxo.id().to_bytes().to_vec(),
attestation: {
let attestation = RoundAttemptAttestation::new(
event.challenge, vtxo.id(), &signed_reqs, &keypair,
);
attestation.serialize()
},
});
}
wallet.register_vtxo_transactions_with_server(&participation.inputs).await
.map_err(HarkForfeitError::Err)?;
let resp = srv.client.submit_payment(protos::SubmitPaymentRequest {
input_vtxos: input_vtxos,
vtxo_requests: signed_reqs.into_iter().map(Into::into).collect(),
#[allow(deprecated)]
offboard_requests: vec![],
unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
}).await.context("Ark server refused our payment submission")?;
Ok(AttemptState::AwaitingUnsignedVtxoTree {
unlock_hash: UnlockHash::from_bytes(&resp.into_inner().unlock_hash)?,
cosign_keys: cosign_keys,
secret_nonces: cosign_nonces.into_iter()
.map(|(sec, _pub)| sec.into_iter()
.map(DangerousSecretNonce::dangerous_from_secret_nonce)
.collect())
.collect(),
})
}
#[derive(Debug, thiserror::Error)]
enum HarkForfeitError {
#[error("error after forfeits were sent")]
SentForfeits(#[source] anyhow::Error),
#[error("error before forfeits were sent")]
Err(#[source] anyhow::Error),
}
async fn hark_cosign_leaf(
wallet: &Wallet,
srv: &mut ServerConnection,
funding_tx: &Transaction,
vtxo: &mut Vtxo<Full>,
) -> anyhow::Result<()> {
let key = wallet.pubkey_keypair(&vtxo.user_pubkey()).await
.context("error fetching keypair").map_err(HarkForfeitError::Err)?
.with_context(|| format!(
"keypair {} not found for VTXO {}", vtxo.user_pubkey(), vtxo.id(),
))?.1;
let (ctx, cosign_req) = LeafVtxoCosignContext::new(vtxo, funding_tx, &key);
let cosign_resp = srv.client.request_leaf_vtxo_cosign(
protos::LeafVtxoCosignRequest::from(cosign_req),
).await
.with_context(|| format!("error requesting leaf cosign for vtxo {}", vtxo.id()))?
.into_inner().try_into()
.context("bad leaf vtxo cosign response")?;
ensure!(ctx.finalize(vtxo, cosign_resp),
"failed to finalize VTXO leaf signature for VTXO {}", vtxo.id(),
);
Ok(())
}
async fn hark_vtxo_swap(
wallet: &Wallet,
participation: &RoundParticipation,
output_vtxos: &mut [Vtxo<Full>],
funding_tx: &Transaction,
unlock_hash: UnlockHash,
) -> Result<(), HarkForfeitError> {
let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
wallet.register_vtxo_transactions_with_server(&participation.inputs).await
.context("couldn't send our input vtxo transactions to server")
.map_err(HarkForfeitError::Err)?;
for vtxo in output_vtxos.iter_mut() {
hark_cosign_leaf(wallet, &mut srv, funding_tx, vtxo).await
.map_err(HarkForfeitError::Err)?;
}
let server_nonces = srv.client.request_forfeit_nonces(protos::ForfeitNoncesRequest {
unlock_hash: unlock_hash.to_byte_array().to_vec(),
vtxo_ids: participation.inputs.iter().map(|v| v.id().to_bytes().to_vec()).collect(),
}).await
.context("request forfeits nonces call failed")
.map_err(HarkForfeitError::Err)?
.into_inner().public_nonces.into_iter()
.map(|b| musig::PublicNonce::from_bytes(b))
.collect::<Result<Vec<_>, _>>()
.context("invalid forfeit nonces")
.map_err(HarkForfeitError::Err)?;
if server_nonces.len() != participation.inputs.len() {
return Err(HarkForfeitError::Err(anyhow!(
"server sent {} nonce pairs, expected {}",
server_nonces.len(), participation.inputs.len(),
)));
}
let mut forfeit_bundles = Vec::with_capacity(participation.inputs.len());
for (input, nonces) in participation.inputs.iter().zip(server_nonces.into_iter()) {
let user_key = wallet.pubkey_keypair(&input.user_pubkey()).await
.ok().flatten().with_context(|| format!(
"failed to fetch keypair for vtxo user pubkey {}", input.user_pubkey(),
)).map_err(HarkForfeitError::Err)?.1;
forfeit_bundles.push(HashLockedForfeitBundle::new(
input, unlock_hash, &user_key, &nonces,
))
}
let preimage = srv.client.forfeit_vtxos(protos::ForfeitVtxosRequest {
forfeit_bundles: forfeit_bundles.iter().map(|b| b.serialize()).collect(),
}).await
.context("forfeit vtxos call failed")
.map_err(HarkForfeitError::SentForfeits)?
.into_inner().unlock_preimage.as_slice().try_into()
.context("invalid preimage length")
.map_err(HarkForfeitError::SentForfeits)?;
for vtxo in output_vtxos.iter_mut() {
if !vtxo.provide_unlock_preimage(preimage) {
return Err(HarkForfeitError::SentForfeits(anyhow!(
"invalid preimage {} for vtxo {} with supposed unlock hash {}",
preimage.as_hex(), vtxo.id(), unlock_hash,
)));
}
vtxo.validate(&funding_tx).with_context(|| format!(
"new VTXO {} does not pass validation after hArk forfeit protocol", vtxo.id(),
)).map_err(HarkForfeitError::SentForfeits)?;
}
Ok(())
}
fn check_vtxo_fails_hash_lock(funding_tx: &Transaction, vtxo: &Vtxo<Full>) -> anyhow::Result<()> {
match vtxo.validate(funding_tx) {
Err(VtxoValidationError::GenesisTransition {
genesis_idx, genesis_len, transition_kind, ..
}) if genesis_idx + 1 == genesis_len && transition_kind == HARK_TRANSITION_KIND => Ok(()),
Ok(()) => Err(anyhow!("new un-unlocked VTXO should fail validation but doesn't: {}",
vtxo.serialize_hex(),
)),
Err(e) => Err(anyhow!("new VTXO {} failed validation: {:#}", vtxo.id(), e)),
}
}
fn check_round_matches_participation(
part: &RoundParticipation,
new_vtxos: &[Vtxo<Full>],
funding_tx: &Transaction,
) -> anyhow::Result<()> {
ensure!(new_vtxos.len() == part.outputs.len(),
"unexpected number of VTXOs: got {}, expected {}", new_vtxos.len(), part.outputs.len(),
);
for (vtxo, req) in new_vtxos.iter().zip(&part.outputs) {
ensure!(vtxo.amount() == req.amount,
"unexpected VTXO amount: got {}, expected {}", vtxo.amount(), req.amount,
);
ensure!(*vtxo.policy() == req.policy,
"unexpected VTXO policy: got {:?}, expected {:?}", vtxo.policy(), req.policy,
);
check_vtxo_fails_hash_lock(funding_tx, vtxo)?;
}
Ok(())
}
async fn check_funding_tx_confirmations(
wallet: &Wallet,
funding_txid: Txid,
funding_tx: &Transaction,
) -> anyhow::Result<bool> {
let tip = wallet.chain.tip().await.context("chain source error")?;
let conf_height = tip - wallet.config.round_tx_required_confirmations + 1;
let tx_status = wallet.chain.tx_status(funding_txid).await.context("chain source error")?;
trace!("Round funding tx {} confirmation status: {:?} (tip={})",
funding_txid, tx_status, tip,
);
match tx_status {
TxStatus::Confirmed(b) if b.height <= conf_height => Ok(true),
TxStatus::Mempool | TxStatus::Confirmed(_) => {
if wallet.config.round_tx_required_confirmations == 0 {
debug!("Accepting round funding tx without confirmations because of configuration");
Ok(true)
} else {
trace!("Hark round funding tx not confirmed (deep enough) yet: {:?}", tx_status);
Ok(false)
}
},
TxStatus::NotFound => {
if let Err(e) = wallet.chain.broadcast_tx(&funding_tx).await {
Err(anyhow!("hark funding tx {} server sent us is rejected by mempool (hex={}): {:#}",
funding_txid, serialize_hex(funding_tx), e,
))
} else {
trace!("hark funding tx {} was not in mempool but we broadcast it", funding_txid);
Ok(false)
}
},
}
}
enum HarkProgressResult {
RoundPending,
RoundNotFound,
FundingTxUnconfirmed {
funding_txid: Txid,
},
Ok {
funding_tx: Transaction,
new_vtxos: Vec<Vtxo<Full>>,
},
}
async fn progress_non_interactive(
wallet: &Wallet,
participation: &RoundParticipation,
unlock_hash: UnlockHash,
) -> Result<HarkProgressResult, HarkForfeitError> {
let (mut srv, _) = wallet.require_server().await.map_err(HarkForfeitError::Err)?;
let resp = match srv.client.round_participation_status(protos::RoundParticipationStatusRequest {
unlock_hash: unlock_hash.to_byte_array().to_vec(),
}).await {
Ok(resp) => resp.into_inner(),
Err(err) if err.code() == tonic::Code::NotFound => {
return Ok(HarkProgressResult::RoundNotFound);
},
Err(err) => {
return Err(HarkForfeitError::Err(
anyhow::Error::from(err).context("error checking round participation status"),
));
},
};
let status = protos::RoundParticipationStatus::try_from(resp.status)
.context("unknown status from server")
.map_err(HarkForfeitError::Err) ?;
if status == protos::RoundParticipationStatus::RoundPartPending {
trace!("Hark round still pending");
return Ok(HarkProgressResult::RoundPending);
}
if status == protos::RoundParticipationStatus::RoundPartReleased {
let preimage = resp.unlock_preimage.as_ref().map(|p| p.as_hex());
warn!("Server says preimage was already released for hArk participation \
with unlock hash {}. Supposed preimage: {:?}", unlock_hash, preimage,
);
}
let funding_tx_bytes = resp.round_funding_tx
.context("funding txid should be provided when status is not pending")
.map_err(HarkForfeitError::Err)?;
let funding_tx = deserialize::<Transaction>(&funding_tx_bytes)
.context("invalid funding txid")
.map_err(HarkForfeitError::Err)?;
let funding_txid = funding_tx.compute_txid();
trace!("Funding tx for round participation with unlock hash {}: {} ({})",
unlock_hash, funding_tx.compute_txid(), funding_tx_bytes.as_hex(),
);
match check_funding_tx_confirmations(wallet, funding_txid, &funding_tx).await {
Ok(true) => {},
Ok(false) => return Ok(HarkProgressResult::FundingTxUnconfirmed { funding_txid }),
Err(e) => return Err(HarkForfeitError::Err(e.context("checking funding tx confirmations"))),
}
let mut new_vtxos = resp.output_vtxos.into_iter()
.map(|v| <Vtxo<Full>>::deserialize(&v))
.collect::<Result<Vec<_>, _>>()
.context("invalid output VTXOs from server")
.map_err(HarkForfeitError::Err)?;
check_round_matches_participation(participation, &new_vtxos, &funding_tx)
.context("new VTXOs received from server don't match our participation")
.map_err(HarkForfeitError::Err)?;
hark_vtxo_swap(wallet, participation, &mut new_vtxos, &funding_tx, unlock_hash).await
.context("error forfeiting hArk VTXOs")
.map_err(HarkForfeitError::SentForfeits)?;
Ok(HarkProgressResult::Ok { funding_tx, new_vtxos })
}
async fn progress_attempt(
state: &AttemptState,
wallet: &Wallet,
part: &RoundParticipation,
event: &RoundEvent,
) -> AttemptProgressResult {
match (state, event) {
(
AttemptState::AwaitingUnsignedVtxoTree { cosign_keys, secret_nonces, unlock_hash },
RoundEvent::VtxoProposal(e),
) => {
trace!("Received VtxoProposal: {:#?}", e);
match sign_vtxo_tree(
wallet,
part,
&cosign_keys,
&secret_nonces,
&e.unsigned_round_tx,
&e.vtxos_spec,
&e.cosign_agg_nonces,
).await {
Ok(()) => {
AttemptProgressResult::Updated {
new_state: AttemptState::AwaitingFinishedRound {
unsigned_round_tx: e.unsigned_round_tx.clone(),
vtxos_spec: e.vtxos_spec.clone(),
unlock_hash: *unlock_hash,
},
}
},
Err(e) => {
trace!("Error signing VTXO tree: {:#}", e);
AttemptProgressResult::Failed(e)
},
}
},
(
AttemptState::AwaitingFinishedRound { unsigned_round_tx, vtxos_spec, unlock_hash },
RoundEvent::Finished(RoundFinished { cosign_sigs, signed_round_tx, .. }),
) => {
if unsigned_round_tx.compute_txid() != signed_round_tx.compute_txid() {
return AttemptProgressResult::Failed(anyhow!(
"signed funding tx ({}) doesn't match tx received before ({})",
signed_round_tx.compute_txid(), unsigned_round_tx.compute_txid(),
));
}
if let Err(e) = wallet.chain.broadcast_tx(&signed_round_tx).await {
warn!("Failed to broadcast signed round tx: {:#}", e);
}
match construct_new_vtxos(
part, unsigned_round_tx, vtxos_spec, cosign_sigs,
).await {
Ok(v) => AttemptProgressResult::Finished {
funding_tx: signed_round_tx.clone(),
vtxos: v,
unlock_hash: *unlock_hash,
},
Err(e) => AttemptProgressResult::Failed(anyhow!(
"failed to construct new VTXOs for round: {:#}", e,
)),
}
},
(state, RoundEvent::Finished(RoundFinished { .. })) => {
AttemptProgressResult::Failed(anyhow!(
"unexpectedly received a finished round while we were in state {}",
state.kind(),
))
},
(state, _) => {
trace!("Ignoring round event {} in state {}", event.kind(), state.kind());
AttemptProgressResult::NotUpdated
},
}
}
async fn sign_vtxo_tree(
wallet: &Wallet,
participation: &RoundParticipation,
cosign_keys: &[Keypair],
secret_nonces: &[impl AsRef<[DangerousSecretNonce]>],
unsigned_round_tx: &Transaction,
vtxo_tree: &VtxoTreeSpec,
cosign_agg_nonces: &[musig::AggregatedNonce],
) -> anyhow::Result<()> {
let (srv, _) = wallet.require_server().await.context("server not available")?;
let vtxos_utxo = OutPoint::new(unsigned_round_tx.compute_txid(), ROUND_TX_VTXO_TREE_VOUT);
let mut my_vtxos = participation.outputs.iter().collect::<Vec<_>>();
for vtxo_req in vtxo_tree.iter_vtxos() {
if let Some(i) = my_vtxos.iter().position(|v| {
v.policy == vtxo_req.vtxo.policy && v.amount == vtxo_req.vtxo.amount
}) {
my_vtxos.swap_remove(i);
}
}
if !my_vtxos.is_empty() {
bail!("server didn't include all of our vtxos, missing: {:?}", my_vtxos);
}
let unsigned_vtxos = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
let iter = participation.outputs.iter().zip(cosign_keys).zip(secret_nonces);
trace!("Sending vtxo signatures to server...");
let _ = try_join_all(iter.map(|((req, key), sec)| async {
let leaf_idx = unsigned_vtxos.spec.leaf_idx_of_req(req).expect("req included");
let secret_nonces = sec.as_ref().iter().map(|s| s.to_sec_nonce()).collect();
let part_sigs = unsigned_vtxos.cosign_branch(
&cosign_agg_nonces, leaf_idx, key, secret_nonces,
).context("failed to cosign branch: our request not part of tree")?;
info!("Sending {} partial vtxo cosign signatures for pk {}",
part_sigs.len(), key.public_key(),
);
let _ = srv.client.clone().provide_vtxo_signatures(protos::VtxoSignaturesRequest {
pubkey: key.public_key().serialize().to_vec(),
signatures: part_sigs.iter().map(|s| s.serialize().to_vec()).collect(),
}).await.context("error sending vtxo signatures")?;
Result::<(), anyhow::Error>::Ok(())
})).await.context("error sending VTXO signatures")?;
trace!("Done sending vtxo signatures to server");
Ok(())
}
async fn construct_new_vtxos(
participation: &RoundParticipation,
unsigned_round_tx: &Transaction,
vtxo_tree: &VtxoTreeSpec,
vtxo_cosign_sigs: &[schnorr::Signature],
) -> anyhow::Result<Vec<Vtxo<Full>>> {
let round_txid = unsigned_round_tx.compute_txid();
let vtxos_utxo = OutPoint::new(round_txid, ROUND_TX_VTXO_TREE_VOUT);
let vtxo_tree = vtxo_tree.clone().into_unsigned_tree(vtxos_utxo);
if vtxo_tree.verify_cosign_sigs(&vtxo_cosign_sigs).is_err() {
bail!("Received incorrect vtxo cosign signatures from server");
}
let signed_vtxos = vtxo_tree
.into_signed_tree(vtxo_cosign_sigs.to_vec())
.into_cached_tree();
let mut expected_vtxos = participation.outputs.iter().collect::<Vec<_>>();
let total_nb_expected_vtxos = expected_vtxos.len();
let mut new_vtxos = vec![];
for (idx, req) in signed_vtxos.spec.spec.vtxos.iter().enumerate() {
if let Some(expected_idx) = expected_vtxos.iter().position(|r| **r == req.vtxo) {
let vtxo = signed_vtxos.build_vtxo(idx);
check_vtxo_fails_hash_lock(unsigned_round_tx, &vtxo)
.context("constructed invalid vtxo from tree")?;
info!("New VTXO from round: {} ({}, {})",
vtxo.id(), vtxo.amount(), vtxo.policy_type(),
);
new_vtxos.push(vtxo);
expected_vtxos.swap_remove(expected_idx);
}
}
if !expected_vtxos.is_empty() {
if expected_vtxos.len() == total_nb_expected_vtxos {
bail!("None of our VTXOs were present in round!");
} else {
bail!("Server included some of our VTXOs but not all: {} missing: {:?}",
expected_vtxos.len(), expected_vtxos,
);
}
}
Ok(new_vtxos)
}
async fn persist_round_success(
wallet: &Wallet,
participation: &RoundParticipation,
movement_id: Option<MovementId>,
new_vtxos: &[Vtxo<Full>],
funding_tx: &Transaction,
) -> anyhow::Result<()> {
debug!("Persisting newly finished round. {} new vtxos, movement ID {:?}",
new_vtxos.len(), movement_id,
);
let store_result = wallet.store_spendable_vtxos(new_vtxos).await
.context("failed to store new VTXOs");
let spent_result = wallet.mark_vtxos_as_spent(&participation.inputs).await
.context("failed to mark input VTXOs as spent");
let update_result = if let Some(mid) = movement_id {
wallet.movements.finish_movement_with_update(
mid,
MovementStatus::Successful,
MovementUpdate::new()
.produced_vtxos(new_vtxos)
.metadata([("funding_txid".into(), serde_json::to_value(funding_tx.compute_txid())?)]),
).await.context("failed to mark movement as finished")
} else {
Ok(())
};
store_result?;
spent_result?;
update_result?;
Ok(())
}
async fn persist_round_failure(
wallet: &Wallet,
participation: &RoundParticipation,
movement_id: Option<MovementId>,
) -> anyhow::Result<()> {
debug!("Attempting to persist the failure of a round with the movement ID {:?}", movement_id);
let unlock_result = wallet.unlock_vtxos(&participation.inputs).await;
let finish_result = if let Some(movement_id) = movement_id {
wallet.movements.finish_movement(movement_id, MovementStatus::Failed).await
} else {
Ok(())
};
if let Err(e) = &finish_result {
error!("Failed to mark movement as failed: {:#}", e);
}
match (unlock_result, finish_result) {
(Ok(()), Ok(())) => Ok(()),
(Err(e), _) => Err(e),
(_, Err(e)) => Err(anyhow!("Failed to mark movement as failed: {:#}", e)),
}
}
async fn update_funding_txid(
wallet: &Wallet,
movement_id: MovementId,
funding_txid: Txid,
) -> anyhow::Result<()> {
wallet.movements.update_movement(
movement_id,
MovementUpdate::new()
.metadata([("funding_txid".into(), serde_json::to_value(&funding_txid)?)])
).await.context("Unable to update funding txid of round")
}
impl Wallet {
pub async fn next_round_start_time(&self) -> anyhow::Result<SystemTime> {
let (mut srv, _) = self.require_server().await?;
let ts = srv.client.next_round_time(protos::Empty {}).await?.into_inner().timestamp;
Ok(UNIX_EPOCH.checked_add(Duration::from_secs(ts)).context("invalid timestamp")?)
}
pub async fn join_next_round(
&self,
participation: RoundParticipation,
movement_kind: Option<RoundMovement>,
) -> anyhow::Result<StoredRoundState> {
let movement_id = if let Some(kind) = movement_kind {
Some(self.movements.new_movement_with_update(
Subsystem::ROUND,
kind.to_string(),
participation.to_movement_update()?
).await?)
} else {
None
};
let state = RoundState::new_interactive(participation, movement_id);
let id = self.db.store_round_state_lock_vtxos(&state).await?;
let state = self.lock_wait_round_state(id).await?
.context("failed to lock fresh round state")?;
Ok(state)
}
pub async fn join_next_round_delegated(
&self,
participation: RoundParticipation,
movement_kind: Option<RoundMovement>,
) -> anyhow::Result<StoredRoundState<Unlocked>> {
let (mut srv, _) = self.require_server().await?;
let movement_id = if let Some(kind) = movement_kind {
Some(self.movements.new_movement_with_update(
Subsystem::ROUND, kind.to_string(), participation.to_movement_update()?,
).await?)
} else {
None
};
let unblinded_mailbox_id = self.mailbox_identifier();
let mut input_vtxos = Vec::with_capacity(participation.inputs.len());
for vtxo in participation.inputs.iter() {
let keypair = self.get_vtxo_key(vtxo).await
.context("failed to get vtxo keypair")?;
input_vtxos.push(protos::InputVtxo {
vtxo_id: vtxo.id().to_bytes().to_vec(),
attestation: {
let attestation = DelegatedRoundParticipationAttestation::new(
vtxo.id(), &participation.outputs, &keypair,
);
attestation.serialize()
},
});
}
let vtxo_requests = participation.outputs.iter()
.map(|req|
protos::VtxoRequest {
policy: req.policy.serialize(),
amount: req.amount.to_sat(),
})
.collect::<Vec<_>>();
let resp = srv.client.submit_round_participation(protos::RoundParticipationRequest {
input_vtxos,
vtxo_requests,
unblinded_mailbox_id: Some(unblinded_mailbox_id.to_vec()),
}).await.context("error submitting round participation to server")?.into_inner();
let unlock_hash = UnlockHash::from_bytes(resp.unlock_hash)
.context("invalid unlock hash from server")?;
let state = RoundState::new_non_interactive(participation, unlock_hash, movement_id);
info!("Non-interactive round participation submitted, it will automatically execute \
when you next sync your wallet after the round happened \
(and has sufficient confirmations).",
);
let id = self.db.store_round_state_lock_vtxos(&state).await?;
Ok(StoredRoundState::new(id, state))
}
pub async fn pending_round_state_ids(&self) -> anyhow::Result<Vec<RoundStateId>> {
self.db.get_pending_round_state_ids().await
}
pub async fn pending_round_states(&self) -> anyhow::Result<Vec<StoredRoundState<Unlocked>>> {
let ids = self.db.get_pending_round_state_ids().await?;
let mut states = Vec::with_capacity(ids.len());
for id in ids {
if let Some(state) = self.db.get_round_state_by_id(id).await? {
states.push(state);
}
}
Ok(states)
}
pub async fn pending_round_balance(&self) -> anyhow::Result<Amount> {
let mut ret = Amount::ZERO;
for round in self.pending_round_states().await? {
ret += round.state().pending_balance();
}
Ok(ret)
}
pub async fn pending_round_input_vtxos(&self) -> anyhow::Result<Vec<WalletVtxo>> {
let mut ret = Vec::new();
for round in self.pending_round_states().await? {
let inputs = round.state().locked_pending_inputs();
ret.reserve(inputs.len());
for input in inputs {
let v = self.get_vtxo_by_id(input.id()).await
.context("unknown round input VTXO")?;
ret.push(v);
}
}
Ok(ret)
}
pub async fn sync_pending_rounds(&self) -> anyhow::Result<()> {
let states = self.pending_round_states().await?;
if states.is_empty() {
return Ok(());
}
debug!("Syncing {} pending round states...", states.len());
tokio_stream::iter(states).for_each_concurrent(10, |state| async move {
if state.state().ongoing_participation() {
return;
}
let mut state = match self.lock_wait_round_state(state.id()).await {
Ok(Some(state)) => state,
Ok(None) => return,
Err(e) => {
warn!("Error locking round state: {:#}", e);
return;
},
};
let status = state.state_mut().sync(self).await;
trace!("Synced round #{}, status: {:?}", state.id(), status);
match status {
Ok(RoundStatus::Confirmed { funding_txid }) => {
info!("Round confirmed. Funding tx {}", funding_txid);
if let Err(e) = self.db.remove_round_state(&state).await {
warn!("Error removing confirmed round state from db: {:#}", e);
}
},
Ok(RoundStatus::Unconfirmed { funding_txid }) => {
info!("Waiting for confirmations for round funding tx {}", funding_txid);
if let Err(e) = self.db.update_round_state(&state).await {
warn!("Error updating pending round state in db: {:#}", e);
}
},
Ok(RoundStatus::Pending) => {
if let Err(e) = self.db.update_round_state(&state).await {
warn!("Error updating pending round state in db: {:#}", e);
}
},
Ok(RoundStatus::Failed { error }) => {
error!("Round failed: {}", error);
if let Err(e) = self.db.remove_round_state(&state).await {
warn!("Error removing failed round state from db: {:#}", e);
}
},
Ok(RoundStatus::Canceled) => {
error!("Round canceled");
if let Err(e) = self.db.remove_round_state(&state).await {
warn!("Error removing canceled round state from db: {:#}", e);
}
},
Err(e) => warn!("Error syncing round: {:#}", e),
}
}).await;
Ok(())
}
async fn get_last_round_event(&self) -> anyhow::Result<RoundEvent> {
let (mut srv, _) = self.require_server().await?;
let e = srv.client.last_round_event(protos::Empty {}).await?.into_inner();
Ok(RoundEvent::try_from(e).context("invalid event format from server")?)
}
async fn inner_process_event(
&self,
state: &mut StoredRoundState,
event: Option<&RoundEvent>,
) {
if let Some(event) = event && state.state().ongoing_participation() {
let updated = state.state_mut().process_event(self, &event).await;
if updated {
if let Err(e) = self.db.update_round_state(&state).await {
error!("Error storing round state #{} after progress: {:#}", state.id(), e);
}
}
}
match state.state_mut().sync(self).await {
Err(e) => warn!("Error syncing round #{}: {:#}", state.id(), e),
Ok(s) if s.is_final() => {
info!("Round #{} finished with result: {:?}", state.id(), s);
if let Err(e) = self.db.remove_round_state(&state).await {
warn!("Failed to remove finished round #{} from db: {:#}", state.id(), e);
}
},
Ok(s) => {
trace!("Round state #{} is now in state {:?}", state.id(), s);
if let Err(e) = self.db.update_round_state(&state).await {
warn!("Error storing round state #{}: {:#}", state.id(), e);
}
},
}
}
pub async fn progress_pending_rounds(
&self,
last_round_event: Option<&RoundEvent>,
) -> anyhow::Result<()> {
let states = self.pending_round_states().await?;
if states.is_empty() {
return Ok(());
}
info!("Processing {} rounds...", states.len());
let mut last_round_event = last_round_event.map(|e| Cow::Borrowed(e));
let has_ongoing_participation = states.iter()
.any(|s| s.state().ongoing_participation());
if has_ongoing_participation && last_round_event.is_none() {
match self.get_last_round_event().await {
Ok(e) => last_round_event = Some(Cow::Owned(e)),
Err(e) => {
warn!("Error fetching round event, \
failed to progress ongoing rounds: {:#}", e);
},
}
}
let event = last_round_event.as_ref().map(|c| c.as_ref());
let futs = states.into_iter().map(async |state| {
let locked = self.lock_wait_round_state(state.id()).await?;
if let Some(mut locked) = locked {
self.inner_process_event(&mut locked, event).await;
}
Ok::<_, anyhow::Error>(())
});
futures::future::join_all(futs).await;
Ok(())
}
pub async fn subscribe_round_events(&self)
-> anyhow::Result<impl Stream<Item = anyhow::Result<RoundEvent>> + Unpin>
{
let (mut srv, _) = self.require_server().await?;
let events = srv.client.subscribe_rounds(protos::Empty {}).await?
.into_inner().map(|m| {
let m = m.context("received error on event stream")?;
let e = RoundEvent::try_from(m.clone())
.with_context(|| format!("error converting rpc round event: {:?}", m))?;
trace!("Received round event: {}", e);
Ok::<_, anyhow::Error>(e)
});
Ok(events)
}
pub async fn participate_ongoing_rounds(&self) -> anyhow::Result<()> {
let mut events = self.subscribe_round_events().await?;
loop {
let state_ids = self.pending_round_states().await?.iter()
.filter(|s| s.state().ongoing_participation())
.map(|s| s.id())
.collect::<Vec<_>>();
if state_ids.is_empty() {
info!("All rounds handled");
return Ok(());
}
let event = events.next().await
.context("events stream broke")?
.context("error on event stream")?;
let futs = state_ids.into_iter().map(async |state| {
let locked = self.lock_wait_round_state(state).await?;
if let Some(mut locked) = locked {
self.inner_process_event(&mut locked, Some(&event)).await;
}
Ok::<_, anyhow::Error>(())
});
futures::future::join_all(futs).await;
}
}
pub async fn cancel_all_pending_rounds(&self) -> anyhow::Result<()> {
let state_ids = self.db.get_pending_round_state_ids().await?;
let futures = state_ids.into_iter().map(|state_id| {
async move {
let mut state = match self.lock_wait_round_state(state_id).await {
Ok(Some(s)) => s,
Ok(None) => return,
Err(e) => return warn!("Error loading round state #{}: {:#}", state_id, e),
};
match state.state_mut().try_cancel(self).await {
Ok(true) => {
if let Err(e) = self.db.remove_round_state(&state).await {
warn!("Error removing canceled round state from db: {:#}", e);
}
},
Ok(false) => {},
Err(e) => warn!("Error trying to cancel round #{}: {:#}", state_id, e),
}
}
});
join_all(futures).await;
Ok(())
}
pub async fn cancel_pending_round(&self, id: RoundStateId) -> anyhow::Result<()> {
let mut state = self.lock_wait_round_state(id).await?
.context("round state not found")?;
if state.state_mut().try_cancel(self).await.context("failed to cancel round")? {
self.db.remove_round_state(&state).await
.context("error removing canceled round state from db")?;
} else {
bail!("failed to cancel round");
}
Ok(())
}
pub(crate) async fn participate_round(
&self,
participation: RoundParticipation,
movement_kind: Option<RoundMovement>,
) -> anyhow::Result<RoundStatus> {
let mut state = self.join_next_round(participation, movement_kind).await?;
info!("Waiting for a round start...");
let mut events = self.subscribe_round_events().await?;
loop {
if !state.state().ongoing_participation() {
let status = state.state_mut().sync(self).await?;
match status {
RoundStatus::Failed { error } => bail!("round failed: {}", error),
RoundStatus::Canceled => bail!("round canceled"),
status => return Ok(status),
}
}
let event = events.next().await
.context("events stream broke")?
.context("error on event stream")?;
if state.state_mut().process_event(self, &event).await {
self.db.update_round_state(&state).await?;
}
}
}
}