pub extern crate ark;
pub extern crate bip39;
pub extern crate lightning_invoice;
pub extern crate lnurl as lnurllib;
use std::collections::HashMap;
use anyhow::Context;
use ark::tree::signed::UnlockHash;
use bitcoin::hashes::Hash;
use bitcoin::Amount;
use bitcoin::hex::DisplayHex;
use bitcoin::secp256k1::Keypair;
use futures::{FutureExt, Stream, StreamExt};
use log::{debug, error, info, trace, warn};
use tokio_util::sync::CancellationToken;
use ark::{ProtocolEncoding, Vtxo, VtxoId};
use ark::lightning::PaymentHash;
use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
use ark::vtxo::Full;
use server_rpc::protos;
use server_rpc::protos::mailbox_server::MailboxMessage;
use crate::Wallet;
use crate::movement::{MovementDestination, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::subsystem::{ArkoorMovement, Subsystem};
const MAX_MAILBOX_REQUEST_BURST: usize = 10;
impl Wallet {
pub fn mailbox_keypair(&self) -> Keypair {
self.seed.to_mailbox_keypair()
}
pub fn recovery_mailbox_keypair(&self) -> Keypair {
self.seed.to_recovery_mailbox_keypair()
}
pub fn mailbox_identifier(&self) -> MailboxIdentifier {
let mailbox_kp = self.mailbox_keypair();
MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
}
pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
let mailbox_kp = self.recovery_mailbox_keypair();
MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
}
pub fn mailbox_authorization(
&self,
authorization_expiry: chrono::DateTime<chrono::Local>,
) -> MailboxAuthorization {
MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
}
pub async fn subscribe_mailbox_messages(
&self,
since_checkpoint: Option<u64>,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
let (mut srv, _) = self.require_server().await?;
let checkpoint = if let Some(since) = since_checkpoint {
since
} else {
self.get_mailbox_checkpoint().await?
};
let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
let auth = self.mailbox_authorization(expiry);
let mailbox_id = auth.mailbox();
let req = protos::mailbox_server::MailboxRequest {
unblinded_id: mailbox_id.to_vec(),
authorization: Some(auth.serialize()),
checkpoint: checkpoint,
};
let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
let m = m.context("received error on mailbox message stream")?;
Ok::<_, anyhow::Error>(m)
});
Ok(stream)
}
pub async fn subscribe_process_mailbox_messages(
&self,
since_checkpoint: Option<u64>,
shutdown: CancellationToken,
) -> anyhow::Result<()> {
let mut reconnect_count = 0;
const MAX_RECONNECT_ATTEMPTS: usize = 5;
'outer: loop {
let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
trace!("Connected to mailbox stream with server");
loop {
futures::select! {
message = stream.next().fuse() => {
if let Some(message) = message {
reconnect_count = 0;
let message = message.context("error on mailbox message stream")?;
self.process_mailbox_message(message).await;
} else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
bail!("Mailbox stream dropped by server, giving up to retry later");
} else {
reconnect_count += 1;
warn!("Mailbox stream dropped by server, reconnecting");
continue 'outer;
}
},
_ = shutdown.cancelled().fuse() => {
info!("Shutdown signal received! Shutting mailbox messages process...");
return Ok(());
},
}
}
}
}
pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
let (mut srv, _) = self.require_server().await?;
let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
let auth = self.mailbox_authorization(expiry);
let mailbox_id = auth.mailbox();
for _ in 0..MAX_MAILBOX_REQUEST_BURST {
let checkpoint = self.get_mailbox_checkpoint().await?;
let mailbox_req = protos::mailbox_server::MailboxRequest {
unblinded_id: mailbox_id.to_vec(),
authorization: Some(auth.serialize()),
checkpoint,
};
let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
.context("error fetching mailbox")?.into_inner();
debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
for mailbox_msg in mailbox_resp.messages {
self.process_mailbox_message(mailbox_msg).await;
}
if !mailbox_resp.have_more {
break;
}
}
Ok(())
}
async fn process_raw_vtxos(
&self,
raw_vtxos: Vec<Vec<u8>>,
) -> Vec<Vtxo<Full>> {
let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
for bytes in &raw_vtxos {
let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
Ok(vtxo) => vtxo,
Err(e) => {
error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
invalid_vtxos.push(bytes);
continue;
}
};
if let Err(e) = self.validate_vtxo(&vtxo).await {
error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
invalid_vtxos.push(bytes);
continue;
}
valid_vtxos.push(vtxo);
}
if !invalid_vtxos.is_empty() {
error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
}
valid_vtxos
}
pub(crate) async fn process_mailbox_message(
&self,
mailbox_msg: MailboxMessage,
) {
use protos::mailbox_server::mailbox_message::Message;
match mailbox_msg.message {
Some(Message::Arkoor(msg)) => {
let result = self
.process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
if let Err(e) = result {
error!("Error processing received arkoor package: {:#}", e);
}
}
Some(Message::RoundParticipationCompleted(m)) => {
info!("Server informed that round participation is ready, unlock_hash:{:?}",
UnlockHash::from_slice(&m.unlock_hash).ok(),
);
if let Err(e) = self.sync_pending_rounds().await {
error!("Error syncing pending rounds: {:#}", e);
}
},
Some(Message::IncomingLightningPayment(msg)) => {
if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
error!("Error handling lightning receive notification: {:#}", e);
}
},
Some(Message::RecoveryVtxoIds(_)) => {
trace!("Received recovery VTXO IDs, ignoring");
}
None => {
warn!("Received unknown mailbox message, ignoring");
}
}
}
async fn process_received_arkoor_package(
&self,
raw_vtxos: Vec<Vec<u8>>,
checkpoint: Option<u64>,
) -> anyhow::Result<()> {
let vtxos = self.process_raw_vtxos(raw_vtxos).await;
let mut new_vtxos = Vec::with_capacity(vtxos.len());
for vtxo in &vtxos {
if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
continue;
}
trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
new_vtxos.push(vtxo);
}
if new_vtxos.is_empty() {
return Ok(());
}
if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
}
let balance = vtxos
.iter()
.map(|vtxo| vtxo.amount()).sum::<Amount>()
.to_signed()?;
self.store_spendable_vtxos(&vtxos).await?;
let mut received_by_address = HashMap::<ark::Address, Amount>::new();
for vtxo in &vtxos {
if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
if let Ok(address) = self.peek_address(index).await {
*received_by_address.entry(address).or_default() += vtxo.amount();
}
}
}
let received_on: Vec<_> = received_by_address
.iter()
.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
.collect();
let movement_id = self.movements.new_finished_movement(
Subsystem::ARKOOR,
ArkoorMovement::Receive.to_string(),
MovementStatus::Successful,
MovementUpdate::new()
.produced_vtxos(&vtxos)
.intended_and_effective_balance(balance)
.received_on(received_on),
).await?;
info!("Received arkoor (movement {}) for {}", movement_id, balance);
if let Some(checkpoint) = checkpoint {
self.store_mailbox_checkpoint(checkpoint).await?;
}
Ok(())
}
async fn handle_lightning_receive_notification(
&self,
notif: protos::mailbox_server::IncomingLightningPaymentMessage,
checkpoint: u64,
) -> anyhow::Result<()> {
let payment_hash = PaymentHash::try_from(notif.payment_hash)
.context("invalid payment hash in lightning receive notification")?;
debug!("Lightning receive notification: payment_hash={}", payment_hash);
match self.try_claim_lightning_receive(payment_hash, false, None).await {
Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
}
self.store_mailbox_checkpoint(checkpoint).await?;
Ok(())
}
pub async fn post_recovery_vtxo_ids(
&self,
vtxo_ids: impl IntoIterator<Item = VtxoId>,
) -> anyhow::Result<()> {
let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
if vtxo_ids.is_empty() {
return Ok(());
}
let nb_vtxos = vtxo_ids.len();
let (mut srv, _) = self.require_server().await?;
let unblinded_id = self.recovery_mailbox_identifier().to_vec();
let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
srv.mailbox_client.post_recovery_vtxo_ids(req).await
.context("error posting recovery vtxo IDs to server")?;
debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
Ok(())
}
pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
Ok(self.db.get_mailbox_checkpoint().await?)
}
async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
}
}