use std::str::FromStr;
use anyhow::Context;
use ark::arkoor::package::ArkoorPackageBuilder;
use bitcoin::{Amount, SignedAmount};
use bitcoin::hex::DisplayHex;
use futures::StreamExt;
use lightning_invoice::Bolt11Invoice;
use log::{trace, debug, info, warn};
use ark::{ProtocolEncoding, Vtxo, VtxoPolicy};
use ark::attestations::{LightningReceiveAttestation};
use ark::fees::validate_and_subtract_fee;
use ark::lightning::{Bolt11InvoiceExt, PaymentHash, Preimage};
use bitcoin_ext::{BlockDelta, BlockHeight};
use server_rpc::protos;
use server_rpc::protos::prepare_lightning_receive_claim_request::LightningReceiveAntiDos;
use crate::subsystem::{LightningMovement, LightningReceiveMovement, Subsystem};
use crate::{Wallet, error};
use crate::movement::{MovementDestination, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::persist::models::LightningReceive;
const LIGHTNING_PREPARE_CLAIM_DELTA: BlockDelta = 2;
impl Wallet {
pub async fn pending_lightning_receives(&self) -> anyhow::Result<Vec<LightningReceive>> {
Ok(self.db.get_all_pending_lightning_receives().await?)
}
pub async fn claimable_lightning_receive_balance(&self) -> anyhow::Result<Amount> {
let receives = self.pending_lightning_receives().await?;
let mut total = Amount::ZERO;
for receive in receives {
total += receive.htlc_vtxos.iter().map(|v| v.amount()).sum::<Amount>();
}
Ok(total)
}
pub async fn bolt11_invoice(&self, amount: Amount) -> anyhow::Result<Bolt11Invoice> {
if amount == Amount::ZERO {
bail!("Cannot create invoice for 0 sats (this would create an explicit 0 sat invoice, not an any-amount invoice)");
}
let (mut srv, ark_info) = self.require_server().await?;
let config = self.config();
let fee = ark_info.fees.lightning_receive.calculate(amount).context("fee overflowed")?;
validate_and_subtract_fee(amount, fee)?;
let requested_min_cltv_delta = ark_info.vtxo_exit_delta +
ark_info.htlc_expiry_delta +
config.vtxo_exit_margin +
config.htlc_recv_claim_delta +
LIGHTNING_PREPARE_CLAIM_DELTA;
if requested_min_cltv_delta > ark_info.max_user_invoice_cltv_delta {
bail!("HTLC CLTV delta ({}) is greater than Server's max HTLC recv CLTV delta: {}",
requested_min_cltv_delta,
ark_info.max_user_invoice_cltv_delta,
);
}
let preimage = Preimage::random();
let payment_hash = preimage.compute_payment_hash();
info!("Start bolt11 board with preimage / payment hash: {} / {}",
preimage.as_hex(), payment_hash.as_hex());
let mailbox_kp = self.seed.to_mailbox_keypair();
let mailbox_id = ark::mailbox::MailboxIdentifier::from_pubkey(mailbox_kp.public_key());
let req = protos::StartLightningReceiveRequest {
payment_hash: payment_hash.to_vec(),
amount_sat: amount.to_sat(),
min_cltv_delta: requested_min_cltv_delta as u32,
mailbox_id: Some(mailbox_id.to_vec()),
};
let resp = srv.client.start_lightning_receive(req).await?.into_inner();
info!("Ark Server is ready to receive LN payment to invoice: {}.", resp.bolt11);
let invoice = Bolt11Invoice::from_str(&resp.bolt11)
.context("invalid bolt11 invoice returned by Ark server")?;
self.db.store_lightning_receive(
payment_hash,
preimage,
&invoice,
requested_min_cltv_delta,
).await?;
Ok(invoice)
}
pub async fn lightning_receive_status(
&self,
payment: impl Into<PaymentHash>,
) -> anyhow::Result<Option<LightningReceive>> {
Ok(self.db.fetch_lightning_receive_by_payment_hash(payment.into()).await?)
}
async fn claim_lightning_receive(
&self,
receive: &mut LightningReceive,
) -> anyhow::Result<()> {
let movement_id = receive.movement_id
.context("No movement created for lightning receive")?;
let (mut srv, _) = self.require_server().await?;
let inputs = {
ensure!(!receive.htlc_vtxos.is_empty(), "no HTLC VTXOs set on record yet");
let mut ret = receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
ret.sort_by_key(|v| v.id());
ret
};
let mut keypairs = Vec::with_capacity(inputs.len());
for v in &inputs {
keypairs.push(self.get_vtxo_key(*v).await?);
}
let (claim_keypair, _) = self.derive_store_next_keypair().await?;
let receive_policy = VtxoPolicy::new_pubkey(claim_keypair.public_key());
trace!("ln arkoor builder params: inputs: {:?}; policy: {:?}",
inputs.iter().map(|v| v.id()).collect::<Vec<_>>(), receive_policy,
);
let builder = ArkoorPackageBuilder::new_claim_all_without_checkpoints(
inputs.iter().copied().cloned(),
receive_policy.clone(),
).context("creating claim arkoor builder failed")?;
let builder = builder.generate_user_nonces(&keypairs)
.context("arkoor nonce generation for claim failed")?;
info!("Claiming arkoor against payment preimage");
self.db.set_preimage_revealed(receive.payment_hash).await?;
let package_cosign_request = protos::ArkoorPackageCosignRequest::from(
builder.cosign_request(),
);
let resp = srv.client.claim_lightning_receive(protos::ClaimLightningReceiveRequest {
payment_hash: receive.payment_hash.to_byte_array().to_vec(),
payment_preimage: receive.payment_preimage.to_vec(),
cosign_request: Some(package_cosign_request),
}).await?.into_inner();
let cosign_resp = resp.try_into().context("invalid cosign response")?;
let outputs = builder.user_cosign(&keypairs, cosign_resp)
.context("claim arkoor cosign failed with user response")?
.build_signed_vtxos();
let mut effective_balance = Amount::ZERO;
for vtxo in &outputs {
trace!("Validating Lightning receive claim VTXO {}: {}",
vtxo.id(), vtxo.serialize_hex(),
);
self.validate_vtxo(vtxo).await
.context("invalid arkoor from lightning receive")?;
effective_balance += vtxo.amount();
}
self.store_spendable_vtxos(&outputs).await?;
self.mark_vtxos_as_spent(inputs).await?;
info!("Got arkoors from lightning: {}",
outputs.iter().map(|v| v.id().to_string()).collect::<Vec<_>>().join(", ")
);
self.movements.finish_movement_with_update(
movement_id,
MovementStatus::Successful,
MovementUpdate::new()
.effective_balance(effective_balance.to_signed()?)
.produced_vtxos(&outputs)
).await?;
self.db.finish_pending_lightning_receive(receive.payment_hash).await?;
*receive = self.db.fetch_lightning_receive_by_payment_hash(receive.payment_hash).await
.context("Database error")?
.context("Receive not found")?;
Ok(())
}
async fn compute_lightning_receive_anti_dos(
&self,
payment_hash: PaymentHash,
token: Option<&str>,
) -> anyhow::Result<LightningReceiveAntiDos> {
Ok(if let Some(token) = token {
LightningReceiveAntiDos::Token(token.to_string())
} else {
let vtxo = self.select_vtxos_to_cover(Amount::ONE_SAT).await
.and_then(|vtxos| vtxos.into_iter().next()
.context("have no spendable vtxo to prove ownership of")
)?;
let vtxo_keypair = self.get_vtxo_key(&vtxo).await.expect("owned vtxo should be in database");
let attestation = LightningReceiveAttestation::new(payment_hash, vtxo.id(), &vtxo_keypair);
LightningReceiveAntiDos::InputVtxo(protos::InputVtxo {
vtxo_id: vtxo.id().to_bytes().to_vec(),
attestation: attestation.serialize(),
})
})
}
async fn check_lightning_receive(
&self,
payment_hash: PaymentHash,
wait: bool,
token: Option<&str>,
) -> anyhow::Result<Option<LightningReceive>> {
let (mut srv, ark_info) = self.require_server().await?;
let current_height = self.chain.tip().await?;
let mut receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
.context("no pending lightning receive found for payment hash, might already be claimed")?;
if !receive.htlc_vtxos.is_empty() {
return Ok(Some(receive))
}
trace!("Requesting updates for ln-receive to server with for wait={} and hash={}", wait, payment_hash);
let sub = srv.client.check_lightning_receive(protos::CheckLightningReceiveRequest {
hash: payment_hash.to_byte_array().to_vec(), wait,
}).await?.into_inner();
let status = protos::LightningReceiveStatus::try_from(sub.status)
.with_context(|| format!("unknown payment status: {}", sub.status))?;
debug!("Received status {:?} for {}", status, payment_hash);
match status {
protos::LightningReceiveStatus::Accepted |
protos::LightningReceiveStatus::HtlcsReady => {},
protos::LightningReceiveStatus::Created => {
return Ok(None);
},
protos::LightningReceiveStatus::Settled => bail!("payment already settled"),
protos::LightningReceiveStatus::Canceled => {
warn!("payment was canceled. removing pending lightning receive");
self.exit_or_cancel_lightning_receive(&receive).await?;
return Ok(None);
},
}
let lightning_receive_anti_dos = match self.compute_lightning_receive_anti_dos(
payment_hash, token,
).await {
Ok(anti_dos) => Some(anti_dos),
Err(e) => {
warn!("Could not compute anti-dos: {e:#}. Trying without");
None
},
};
let htlc_recv_expiry = current_height + receive.htlc_recv_cltv_delta as BlockHeight;
let (next_keypair, _) = self.derive_store_next_keypair().await?;
let req = protos::PrepareLightningReceiveClaimRequest {
payment_hash: receive.payment_hash.to_vec(),
user_pubkey: next_keypair.public_key().serialize().to_vec(),
htlc_recv_expiry,
lightning_receive_anti_dos,
};
let res = srv.client.prepare_lightning_receive_claim(req).await
.context("error preparing lightning receive claim")?.into_inner();
let vtxos = res.htlc_vtxos.into_iter()
.map(|b| Vtxo::deserialize(&b))
.collect::<Result<Vec<_>, _>>()
.context("invalid htlc vtxos from server")?;
let mut htlc_amount = Amount::ZERO;
for vtxo in &vtxos {
trace!("Received HTLC VTXO {} from server: {}", vtxo.id(), vtxo.serialize_hex());
self.validate_vtxo(vtxo).await
.context("received invalid HTLC VTXO from server")?;
htlc_amount += vtxo.amount();
if let VtxoPolicy::ServerHtlcRecv(p) = vtxo.policy() {
if p.payment_hash != receive.payment_hash {
bail!("invalid payment hash on HTLC VTXOs received from server: {}",
p.payment_hash,
);
}
if p.user_pubkey != next_keypair.public_key() {
bail!("invalid pubkey on HTLC VTXOs received from server: {}", p.user_pubkey);
}
if p.htlc_expiry < htlc_recv_expiry {
bail!("HTLC VTXO expiry height is less than requested: Requested {}, received {}", htlc_recv_expiry, p.htlc_expiry);
}
} else {
bail!("invalid HTLC VTXO policy: {:?}", vtxo.policy());
}
}
let invoice_amount = receive.invoice.get_payment_amount(None)
.context("ln receive invoice should have amount")?;
let server_received_amount = res.receive.map(|r| Amount::from_sat(r.amount_sat));
let fee = {
let fee = server_received_amount
.and_then(|a| ark_info.fees.lightning_receive.calculate(a));
match (server_received_amount, fee) {
(Some(amount), Some(fee)) if htlc_amount + fee == amount => {
fee
},
_ => {
ark_info.fees.lightning_receive.calculate(invoice_amount)
.expect("we previously validated this")
}
}
};
let received = htlc_amount + fee;
ensure!(received >= invoice_amount,
"Server didn't return enough VTXOs to cover invoice amount"
);
let movement_id = if let Some(movement_id) = receive.movement_id {
movement_id
} else {
self.movements.new_movement_with_update(
Subsystem::LIGHTNING_RECEIVE,
LightningReceiveMovement::Receive.to_string(),
MovementUpdate::new()
.intended_balance(invoice_amount.to_signed()?)
.effective_balance(htlc_amount.to_signed()?)
.fee(fee)
.metadata(LightningMovement::metadata(
receive.payment_hash, &vtxos, Some(receive.payment_preimage),
))
.received_on(
[MovementDestination::new(receive.invoice.clone().into(), received)],
),
).await?
};
self.store_locked_vtxos(&vtxos, Some(movement_id)).await?;
let vtxo_ids = vtxos.iter().map(|v| v.id()).collect::<Vec<_>>();
self.db.update_lightning_receive(payment_hash, &vtxo_ids, movement_id).await?;
let mut wallet_vtxos = vec![];
for vtxo in vtxos {
let v = self.db.get_wallet_vtxo(vtxo.id()).await?
.context("Failed to get wallet VTXO for lightning receive")?;
wallet_vtxos.push(v);
}
receive.htlc_vtxos = wallet_vtxos;
receive.movement_id = Some(movement_id);
Ok(Some(receive))
}
async fn exit_lightning_receive(
&self,
lightning_receive: &LightningReceive,
) -> anyhow::Result<()> {
ensure!(!lightning_receive.htlc_vtxos.is_empty(), "no HTLC VTXOs to exit");
let vtxos = lightning_receive.htlc_vtxos.iter().map(|v| &v.vtxo).collect::<Vec<_>>();
info!("Exiting HTLC VTXOs for lightning_receive with payment hash {}", lightning_receive.payment_hash);
self.exit.write().await.start_exit_for_vtxos(&vtxos).await?;
if let Some(movement_id) = lightning_receive.movement_id {
self.movements.finish_movement_with_update(
movement_id,
MovementStatus::Failed,
MovementUpdate::new().exited_vtxos(vtxos),
).await?;
} else {
error!("movement id is missing but we disclosed preimage: {}", lightning_receive.payment_hash);
}
self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
Ok(())
}
pub(crate) async fn exit_or_cancel_lightning_receive(
&self,
lightning_receive: &LightningReceive,
) -> anyhow::Result<()> {
let vtxos = &lightning_receive.htlc_vtxos;
let update_opt = match (vtxos.is_empty(), lightning_receive.preimage_revealed_at) {
(false, Some(_)) => {
return self.exit_lightning_receive(lightning_receive).await;
}
(false, None) => {
warn!("HTLC-recv VTXOs are about to expire, but preimage has not been disclosed yet. Canceling");
self.mark_vtxos_as_spent(vtxos).await?;
if let Some(movement_id) = lightning_receive.movement_id {
Some((
movement_id,
MovementUpdate::new()
.effective_balance(SignedAmount::ZERO),
MovementStatus::Canceled,
))
} else {
error!("movement id is missing but we got HTLC vtxos: {}", lightning_receive.payment_hash);
None
}
}
(true, Some(_)) => {
error!("No HTLC vtxos set on ln receive but preimage has been disclosed. Canceling");
lightning_receive.movement_id.map(|id| (id,
MovementUpdate::new()
.effective_balance(SignedAmount::ZERO),
MovementStatus::Canceled,
))
}
(true, None) => None,
};
if let Some((movement_id, update, status)) = update_opt {
self.movements.finish_movement_with_update(movement_id, status, update).await?;
}
self.db.finish_pending_lightning_receive(lightning_receive.payment_hash).await?;
Ok(())
}
pub async fn cancel_lightning_receive(
&self,
payment_hash: PaymentHash,
) -> anyhow::Result<()> {
let receive = self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
.context("no pending lightning receive found for this payment hash")?;
if receive.preimage_revealed_at.is_some() {
bail!("cannot cancel: preimage has already been revealed");
}
if receive.finished_at.is_some() {
bail!("lightning receive is already finished");
}
let (mut srv, _) = self.require_server().await?;
srv.client.cancel_lightning_receive(protos::CancelLightningReceiveRequest {
payment_hash: payment_hash.to_vec(),
}).await.context("server refused cancellation")?;
self.exit_or_cancel_lightning_receive(&receive).await?;
Ok(())
}
pub async fn try_claim_lightning_receive(
&self,
payment_hash: PaymentHash,
wait: bool,
token: Option<&str>,
) -> anyhow::Result<LightningReceive> {
trace!("Claiming lightning receive for payment hash: {}", payment_hash);
{
let mut inflight = self.inflight_lightning_payments.lock().await;
if !inflight.insert(payment_hash) {
bail!("Receive operation already in progress for this payment");
}
}
let result = self.try_claim_lightning_receive_inner(payment_hash, wait, token).await;
{
let mut inflight = self.inflight_lightning_payments.lock().await;
inflight.remove(&payment_hash);
}
result
}
async fn try_claim_lightning_receive_inner(
&self,
payment_hash: PaymentHash,
wait: bool,
token: Option<&str>,
) -> anyhow::Result<LightningReceive> {
let mut receive = match self.check_lightning_receive(payment_hash, wait, token).await? {
Some(receive) => receive,
None => {
return self.db.fetch_lightning_receive_by_payment_hash(payment_hash).await?
.context("No receive for payment_hash")
}
};
if receive.finished_at.is_some() {
return Ok(receive);
}
if receive.htlc_vtxos.is_empty() {
return Ok(receive);
}
match self.claim_lightning_receive(&mut receive).await {
Ok(()) => Ok(receive),
Err(e) => {
error!("Failed to claim htlcs for payment_hash: {}", receive.payment_hash);
warn!("Exiting lightning receive VTXOs");
self.exit_lightning_receive(&receive).await?;
return Err(e)
}
}
}
pub async fn try_claim_all_lightning_receives(&self, wait: bool) -> anyhow::Result<Vec<LightningReceive>> {
let pending = self.pending_lightning_receives().await?;
let total = pending.len();
if total == 0 {
return Ok(vec![]);
}
let results: Vec<_> = tokio_stream::iter(pending)
.map(|rcv| async move {
self.try_claim_lightning_receive(rcv.invoice.into(), wait, None).await
})
.buffer_unordered(3)
.collect()
.await;
let mut claimed = vec![];
let mut failed = 0;
for result in results {
match result {
Ok(receive) => claimed.push(receive),
Err(e) => {
error!("Error claiming lightning receive: {:#}", e);
failed += 1;
}
}
}
if failed > 0 {
info!(
"Lightning receive claims: {} succeeded, {} failed out of {} pending",
claimed.len(), failed, total
);
}
if claimed.is_empty() {
anyhow::bail!("All {} lightning receive claim(s) failed", failed);
}
Ok(claimed)
}
}