bark-wallet 0.1.4

Wallet library and CLI for the bitcoin Ark protocol built by Second
Documentation

pub extern crate ark;

pub extern crate bip39;
pub extern crate lightning_invoice;
pub extern crate lnurl as lnurllib;

use std::collections::HashMap;

use anyhow::Context;
use ark::tree::signed::UnlockHash;
use bitcoin::hashes::Hash;
use bitcoin::Amount;
use bitcoin::hex::DisplayHex;
use bitcoin::secp256k1::Keypair;
use futures::{FutureExt, Stream, StreamExt};
use log::{debug, error, info, trace, warn};
use tokio_util::sync::CancellationToken;

use ark::{ProtocolEncoding, Vtxo, VtxoId};
use ark::lightning::PaymentHash;
use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
use ark::vtxo::Full;
use server_rpc::protos;
use server_rpc::protos::mailbox_server::MailboxMessage;

use crate::Wallet;
use crate::movement::{MovementDestination, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::subsystem::{ArkoorMovement, Subsystem};


/// The maximum number of times we will call the fetch mailbox endpoint in one go
///
/// We can't trust the server to honestly tell us to keep trying more forever.
/// A malicious server could send us empty messages or invalid messages and
/// lock up our resources forever. So we limit the number of times we will fetch.
/// If a user actually has more messages left, he will have to call sync again.
///
/// (Note that currently the server sends 100 messages per fetch, so this would
/// only happen for users with more than 1000 pending items.)
const MAX_MAILBOX_REQUEST_BURST: usize = 10;

impl Wallet {
	/// Get the keypair used for the server mailbox
	pub fn mailbox_keypair(&self) -> Keypair {
		self.seed.to_mailbox_keypair()
	}

	/// Get the keypair used for the server recovery mailbox
	pub fn recovery_mailbox_keypair(&self) -> Keypair {
		self.seed.to_recovery_mailbox_keypair()
	}

	/// Get this wallet's server mailbox ID
	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
		let mailbox_kp = self.mailbox_keypair();
		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
	}

	/// Get this wallet's server recovery mailbox ID
	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
		let mailbox_kp = self.recovery_mailbox_keypair();
		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
	}

	/// Create a mailbox authorization that is valid until the given expiry time
	///
	/// This authorization can be used by third parties to lookup your mailbox
	/// with the Ark server.
	pub fn mailbox_authorization(
		&self,
		authorization_expiry: chrono::DateTime<chrono::Local>,
	) -> MailboxAuthorization {
		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
	}

	/// Subscribe to mailbox message stream.
	///
	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
	///
	/// Returns a stream of mailbox messages.
	pub async fn subscribe_mailbox_messages(
		&self,
		since_checkpoint: Option<u64>,
	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
		let (mut srv, _) = self.require_server().await?;

		let checkpoint = if let Some(since) = since_checkpoint {
			since
		} else {
			self.get_mailbox_checkpoint().await?
		};

		// we just need a short authorization for the stream initialization
		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
		let auth = self.mailbox_authorization(expiry);
		let mailbox_id = auth.mailbox();

		let req = protos::mailbox_server::MailboxRequest {
			unblinded_id: mailbox_id.to_vec(),
			authorization: Some(auth.serialize()),
			checkpoint: checkpoint,
		};

		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
			let m = m.context("received error on mailbox message stream")?;
			Ok::<_, anyhow::Error>(m)
		});

		Ok(stream)
	}

	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
	/// is triggered.
	///
	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
	/// the database.
	///
	/// Returns only once the stream is closed.
	pub async fn subscribe_process_mailbox_messages(
		&self,
		since_checkpoint: Option<u64>,
		shutdown: CancellationToken,
	) -> anyhow::Result<()> {
		let mut reconnect_count = 0;
		const MAX_RECONNECT_ATTEMPTS: usize = 5;

		'outer: loop {
			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
			trace!("Connected to mailbox stream with server");

			loop {
				futures::select! {
					message = stream.next().fuse() => {
						if let Some(message) = message {
							reconnect_count = 0;
							let message = message.context("error on mailbox message stream")?;
							self.process_mailbox_message(message).await;
						} else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
							bail!("Mailbox stream dropped by server, giving up to retry later");
						} else {
							reconnect_count += 1;
							warn!("Mailbox stream dropped by server, reconnecting");
							continue 'outer;
						}
					},
					_ = shutdown.cancelled().fuse() => {
						info!("Shutdown signal received! Shutting mailbox messages process...");
						return Ok(());
					},
				}
			}
		}
	}

	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
		let (mut srv, _) = self.require_server().await?;

		// we should be able to do all our syncing in 10 minutes
		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
		let auth = self.mailbox_authorization(expiry);
		let mailbox_id = auth.mailbox();

		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
			let checkpoint = self.get_mailbox_checkpoint().await?;
			let mailbox_req = protos::mailbox_server::MailboxRequest {
				unblinded_id: mailbox_id.to_vec(),
				authorization: Some(auth.serialize()),
				checkpoint,
			};

			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
				.context("error fetching mailbox")?.into_inner();
			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());

			for mailbox_msg in mailbox_resp.messages {
				self.process_mailbox_message(mailbox_msg).await;
			}

			if !mailbox_resp.have_more {
				break;
			}
		}

		Ok(())
	}

	/// Turn raw byte arrays into VTXOs, then validate them.
	///
	/// This function doesn't return a result on purpose,
	/// because we want to make sure we don't early return on
	/// the first error. This ensure we process all VTXOs, even
	/// if some are invalid, and print everything we received.
	async fn process_raw_vtxos(
		&self,
		raw_vtxos: Vec<Vec<u8>>,
	) -> Vec<Vtxo<Full>> {
		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());

		for bytes in &raw_vtxos {
			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
				Ok(vtxo) => vtxo,
				Err(e) => {
					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
					invalid_vtxos.push(bytes);
					continue;
				}
			};

			if let Err(e) = self.validate_vtxo(&vtxo).await {
				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
				invalid_vtxos.push(bytes);
				continue;
			}

			valid_vtxos.push(vtxo);
		}

		// We log all invalid VTXOs to keep track
		if !invalid_vtxos.is_empty() {
			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
		}

		valid_vtxos
	}

	pub(crate) async fn process_mailbox_message(
		&self,
		mailbox_msg: MailboxMessage,
	) {
		use protos::mailbox_server::mailbox_message::Message;

		match mailbox_msg.message {
			Some(Message::Arkoor(msg)) => {
				let result = self
					.process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
				if let Err(e) = result {
					error!("Error processing received arkoor package: {:#}", e);
				}
			}
			Some(Message::RoundParticipationCompleted(m)) => {
				// Do we want to do custom code paths for progressing the round participations
				// via the payment hashes returnded by the server?
				info!("Server informed that round participation is ready, unlock_hash:{:?}",
					UnlockHash::from_slice(&m.unlock_hash).ok(),
				);
				if let Err(e) = self.sync_pending_rounds().await {
					error!("Error syncing pending rounds: {:#}", e);
				}
			},
			Some(Message::IncomingLightningPayment(msg)) => {
				if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
					error!("Error handling lightning receive notification: {:#}", e);
				}
			},
			Some(Message::RecoveryVtxoIds(_)) => {
				trace!("Received recovery VTXO IDs, ignoring");
			}
			None => {
				warn!("Received unknown mailbox message, ignoring");
			}
		}
	}

	async fn process_received_arkoor_package(
		&self,
		raw_vtxos: Vec<Vec<u8>>,
		checkpoint: Option<u64>,
	) -> anyhow::Result<()> {
		let vtxos = self.process_raw_vtxos(raw_vtxos).await;

		let mut new_vtxos = Vec::with_capacity(vtxos.len());
		for vtxo in &vtxos {
			// Skip if already in wallet
			if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
				continue;
			}

			trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
			new_vtxos.push(vtxo);
		}

		if new_vtxos.is_empty() {
			return Ok(());
		}

		// Redundantly re-register the received vtxos with the server. An
		// up-to-date sender already does this after cosign, but older
		// senders may not, so we do it on receive too to make sure the
		// server has signed_tx rows for our spendable vtxos. Any failure
		// is logged and swallowed: the receive must still proceed so we
		// don't lose track of the vtxos locally, and later spends will
		// retry registration if still needed.
		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
		}

		let balance = vtxos
			.iter()
			.map(|vtxo| vtxo.amount()).sum::<Amount>()
			.to_signed()?;
		self.store_spendable_vtxos(&vtxos).await?;

		// Build received_on destinations from received VTXOs, aggregated by address
		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
		for vtxo in &vtxos {
			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
				if let Ok(address) = self.peek_address(index).await {
					*received_by_address.entry(address).or_default() += vtxo.amount();
				}
			}
		}
		let received_on: Vec<_> = received_by_address
			.iter()
			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
			.collect();

		let movement_id = self.movements.new_finished_movement(
			Subsystem::ARKOOR,
			ArkoorMovement::Receive.to_string(),
			MovementStatus::Successful,
			MovementUpdate::new()
				.produced_vtxos(&vtxos)
				.intended_and_effective_balance(balance)
				.received_on(received_on),
		).await?;

		info!("Received arkoor (movement {}) for {}", movement_id, balance);

		if let Some(checkpoint) = checkpoint {
			self.store_mailbox_checkpoint(checkpoint).await?;
		}

		Ok(())
	}

	/// Handle a lightning receive notification from the mailbox.
	///
	/// This is a signal that the server has received a lightning payment for us
	/// and we should come online to claim it.
	async fn handle_lightning_receive_notification(
		&self,
		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
		checkpoint: u64,
	) -> anyhow::Result<()> {
		let payment_hash = PaymentHash::try_from(notif.payment_hash)
			.context("invalid payment hash in lightning receive notification")?;

		debug!("Lightning receive notification: payment_hash={}", payment_hash);

		match self.try_claim_lightning_receive(payment_hash, false, None).await {
			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
		}

		self.store_mailbox_checkpoint(checkpoint).await?;
		Ok(())
	}

	/// Post vtxo IDs to the server's recovery mailbox
	pub async fn post_recovery_vtxo_ids(
		&self,
		vtxo_ids: impl IntoIterator<Item = VtxoId>,
	) -> anyhow::Result<()> {
		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
		if vtxo_ids.is_empty() {
			return Ok(());
		}
		let nb_vtxos = vtxo_ids.len();

		let (mut srv, _) = self.require_server().await?;
		let unblinded_id = self.recovery_mailbox_identifier().to_vec();
		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };

		srv.mailbox_client.post_recovery_vtxo_ids(req).await
			.context("error posting recovery vtxo IDs to server")?;

		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
		Ok(())
	}

	/// Return the stored mailbox checkpoint — the tip position the wallet
	/// has consumed up to. After a successful [`sync_mailbox`], this value
	/// reflects the server's latest advertised tip.
	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
		Ok(self.db.get_mailbox_checkpoint().await?)
	}

	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
		Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
	}
}