Skip to main content

bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::actions::DriveMode;
29use crate::actions::lightning::pay::Progress;
30use crate::movement::{MovementDestination, MovementStatus};
31use crate::movement::update::MovementUpdate;
32use crate::subsystem::{ArkoorMovement, Subsystem};
33
34
35/// The maximum number of times we will call the fetch mailbox endpoint in one go
36///
37/// We can't trust the server to honestly tell us to keep trying more forever.
38/// A malicious server could send us empty messages or invalid messages and
39/// lock up our resources forever. So we limit the number of times we will fetch.
40/// If a user actually has more messages left, he will have to call sync again.
41///
42/// (Note that currently the server sends 100 messages per fetch, so this would
43/// only happen for users with more than 1000 pending items.)
44const MAX_MAILBOX_REQUEST_BURST: usize = 10;
45
46impl Wallet {
47	/// Get the keypair used for the server mailbox
48	pub fn mailbox_keypair(&self) -> Keypair {
49		self.inner.seed.to_mailbox_keypair()
50	}
51
52	/// Get the keypair used for the server recovery mailbox
53	pub fn recovery_mailbox_keypair(&self) -> Keypair {
54		self.inner.seed.to_recovery_mailbox_keypair()
55	}
56
57	/// Get this wallet's server mailbox ID
58	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
59		let mailbox_kp = self.mailbox_keypair();
60		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
61	}
62
63	/// Get this wallet's server recovery mailbox ID
64	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
65		let mailbox_kp = self.recovery_mailbox_keypair();
66		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
67	}
68
69	/// Create a mailbox authorization that is valid until the given expiry time
70	///
71	/// This authorization can be used by third parties to lookup your mailbox
72	/// with the Ark server.
73	pub fn mailbox_authorization(
74		&self,
75		authorization_expiry: chrono::DateTime<chrono::Local>,
76	) -> MailboxAuthorization {
77		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
78	}
79
80	/// Subscribe to mailbox message stream.
81	///
82	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
83	///
84	/// Returns a stream of mailbox messages.
85	pub async fn subscribe_mailbox_messages(
86		&self,
87		since_checkpoint: Option<u64>,
88	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
89		let (mut srv, _) = self.require_server().await?;
90
91		let checkpoint = if let Some(since) = since_checkpoint {
92			since
93		} else {
94			self.get_mailbox_checkpoint().await?
95		};
96
97		// we just need a short authorization for the stream initialization
98		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
99		let auth = self.mailbox_authorization(expiry);
100		let mailbox_id = auth.mailbox();
101
102		let req = protos::mailbox_server::MailboxRequest {
103			unblinded_id: mailbox_id.serialize(),
104			authorization: Some(auth.serialize()),
105			checkpoint: checkpoint,
106		};
107
108		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
109			let m = m.context("received error on mailbox message stream")?;
110			Ok::<_, anyhow::Error>(m)
111		});
112
113		Ok(stream)
114	}
115
116	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
117	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
118	/// is triggered.
119	///
120	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
121	/// the database.
122	///
123	/// Returns only once the stream is closed.
124	pub async fn subscribe_process_mailbox_messages(
125		&self,
126		since_checkpoint: Option<u64>,
127		shutdown: CancellationToken,
128	) -> anyhow::Result<()> {
129		let mut reconnect_count = 0;
130		const MAX_RECONNECT_ATTEMPTS: usize = 5;
131
132		'outer: loop {
133			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
134			trace!("Connected to mailbox stream with server");
135
136			loop {
137				futures::select! {
138					message = stream.next().fuse() => {
139						match message {
140							Some(Ok(message)) => {
141								reconnect_count = 0;
142								self.process_mailbox_message(message).await;
143							},
144							// A tonic h2 stream reset is almost always a
145							// proxy- or server-side idle timeout rather than
146							// a real failure; resubscribe quietly.
147							Some(Err(e)) if crate::utils::is_h2_stream_error(&e) => {
148								reconnect_count = 0;
149								trace!("Mailbox stream reset by server, reconnecting: {e:#}");
150								continue 'outer;
151							},
152							Some(Err(e)) => {
153								return Err(e).context("error on mailbox message stream");
154							},
155							None if reconnect_count >= MAX_RECONNECT_ATTEMPTS => {
156								bail!("Mailbox stream dropped by server, giving up to retry later");
157							},
158							None => {
159								reconnect_count += 1;
160								warn!("Mailbox stream dropped by server, reconnecting");
161								continue 'outer;
162							},
163						}
164					},
165					_ = shutdown.cancelled().fuse() => {
166						info!("Shutdown signal received! Shutting mailbox messages process...");
167						return Ok(());
168					},
169				}
170			}
171		}
172	}
173
174	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
175	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
176		let (mut srv, _) = self.require_server().await?;
177
178		// we should be able to do all our syncing in 10 minutes
179		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
180		let auth = self.mailbox_authorization(expiry);
181		let mailbox_id = auth.mailbox();
182
183		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
184			let checkpoint = self.get_mailbox_checkpoint().await?;
185			let mailbox_req = protos::mailbox_server::MailboxRequest {
186				unblinded_id: mailbox_id.serialize(),
187				authorization: Some(auth.serialize()),
188				checkpoint,
189			};
190
191			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
192				.context("error fetching mailbox")?.into_inner();
193			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
194
195			for mailbox_msg in mailbox_resp.messages {
196				self.process_mailbox_message(mailbox_msg).await;
197			}
198
199			if !mailbox_resp.have_more {
200				break;
201			}
202		}
203
204		Ok(())
205	}
206
207	/// Turn raw byte arrays into VTXOs, then validate them.
208	///
209	/// This function doesn't return a result on purpose,
210	/// because we want to make sure we don't early return on
211	/// the first error. This ensure we process all VTXOs, even
212	/// if some are invalid, and print everything we received.
213	async fn process_raw_vtxos(
214		&self,
215		raw_vtxos: Vec<Vec<u8>>,
216	) -> Vec<Vtxo<Full>> {
217		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
218		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
219
220		for bytes in &raw_vtxos {
221			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
222				Ok(vtxo) => vtxo,
223				Err(e) => {
224					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
225					invalid_vtxos.push(bytes);
226					continue;
227				}
228			};
229
230			if let Err(e) = self.validate_vtxo(&vtxo).await {
231				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
232				invalid_vtxos.push(bytes);
233				continue;
234			}
235
236			valid_vtxos.push(vtxo);
237		}
238
239		// We log all invalid VTXOs to keep track
240		if !invalid_vtxos.is_empty() {
241			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
242		}
243
244		valid_vtxos
245	}
246
247	pub(crate) async fn process_mailbox_message(
248		&self,
249		mailbox_msg: MailboxMessage,
250	) {
251		use protos::mailbox_server::mailbox_message::Message;
252
253		// Each arm returns whether the checkpoint should advance. Only
254		// arkoor returns false on processing error so the server
255		// redelivers and we retry. Every other arm advances regardless,
256		// either because the work is idempotent and re-done on every
257		// wallet sync, or because the message is informational/ignored.
258		let advance = match mailbox_msg.message {
259			Some(Message::Arkoor(msg)) => {
260				match self.process_received_arkoor_package(msg.vtxos).await {
261					Ok(()) => true,
262					Err(e) => {
263						error!("Error processing received arkoor package: {:#}", e);
264						false
265					}
266				}
267			}
268			Some(Message::RoundParticipationCompleted(m)) => {
269				info!("Server informed that round participation is ready, unlock_hash:{:?}",
270					UnlockHash::from_slice(&m.unlock_hash).ok(),
271				);
272				if let Err(e) = self.sync_pending_rounds().await {
273					error!("Error syncing pending rounds: {:#}", e);
274				}
275				true
276			},
277			Some(Message::IncomingLightningPayment(msg)) => {
278				if let Err(e) = self.handle_lightning_receive_notification(msg).await {
279					error!("Error handling lightning receive notification: {:#}", e);
280				}
281				true
282			},
283			Some(Message::RecoveryVtxoIds(_)) => {
284				trace!("Received recovery VTXO IDs, ignoring");
285				true
286			}
287			Some(Message::LightningSendFinished(msg)) => {
288				if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
289					error!("Error handling lightning send finished notification: {:#}", e);
290				}
291				true
292			}
293			None => {
294				warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
295					mailbox_msg.checkpoint);
296				true
297			}
298		};
299
300		if advance {
301			if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
302				error!("Error storing mailbox checkpoint: {:#}", e);
303			}
304		}
305	}
306
307	async fn process_received_arkoor_package(
308		&self,
309		raw_vtxos: Vec<Vec<u8>>,
310	) -> anyhow::Result<()> {
311		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
312
313		let mut new_vtxos = Vec::with_capacity(vtxos.len());
314		for vtxo in &vtxos {
315			// Skip if already in wallet
316			if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
317				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
318				continue;
319			}
320
321			trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
322			new_vtxos.push(vtxo);
323		}
324
325		if new_vtxos.is_empty() {
326			return Ok(());
327		}
328
329		// Redundantly re-register the received vtxos with the server. An
330		// up-to-date sender already does this after cosign, but older
331		// senders may not, so we do it on receive too to make sure the
332		// server has signed_tx rows for our spendable vtxos. Any failure
333		// is logged and swallowed: the receive must still proceed so we
334		// don't lose track of the vtxos locally, and later spends will
335		// retry registration if still needed.
336		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
337			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
338		}
339
340		let balance = vtxos
341			.iter()
342			.map(|vtxo| vtxo.amount()).sum::<Amount>()
343			.to_signed()?;
344		self.store_spendable_vtxos(&vtxos).await?;
345
346		// Build received_on destinations from received VTXOs, aggregated by address
347		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
348		for vtxo in &vtxos {
349			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
350				if let Ok(address) = self.peek_address(index).await {
351					*received_by_address.entry(address).or_default() += vtxo.amount();
352				}
353			}
354		}
355		let received_on: Vec<_> = received_by_address
356			.iter()
357			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
358			.collect();
359
360		let movement_id = self.inner.movements.new_finished_movement(
361			Subsystem::ARKOOR,
362			ArkoorMovement::Receive.to_string(),
363			MovementStatus::Successful,
364			MovementUpdate::new()
365				.produced_vtxos(&vtxos)
366				.intended_and_effective_balance(balance)
367				.received_on(received_on),
368		).await?;
369
370		info!("Received arkoor (movement {}) for {}", movement_id, balance);
371
372		Ok(())
373	}
374
375	/// Handle a lightning receive notification from the mailbox.
376	///
377	/// This is a signal that the server has received a lightning payment for us
378	/// and we should come online to claim it.
379	async fn handle_lightning_receive_notification(
380		&self,
381		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
382	) -> anyhow::Result<()> {
383		let payment_hash = PaymentHash::try_from(notif.payment_hash)
384			.context("invalid payment hash in lightning receive notification")?;
385
386		debug!("Lightning receive notification: payment_hash={}", payment_hash);
387
388		match self.try_claim_lightning_receive(payment_hash, false, None).await {
389			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
390			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
391		}
392
393		Ok(())
394	}
395
396	/// Handle a lightning send finished notification from the mailbox.
397	///
398	/// This notification indicates that the server has completed processing
399	/// a lightning payment we initiated, either successfully or with failure.
400	async fn handle_lightning_send_finished(
401		&self,
402		notif: protos::mailbox_server::LightningSendFinishedMessage,
403		checkpoint: u64,
404	) -> anyhow::Result<()> {
405		let payment_hash = PaymentHash::try_from(notif.payment_hash)
406			.context("invalid payment hash in lightning send finished notification")?;
407
408		let known_preimage = notif.preimage
409			.and_then(|bytes| Preimage::try_from(bytes).ok());
410
411		if known_preimage.is_some() {
412			debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
413		} else {
414			debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
415		}
416
417		// Errors are logged but not propagated: we always advance the
418		// mailbox checkpoint to avoid re-processing the same
419		// notification on the next poll.
420		match self.is_invoice_paid(payment_hash).await {
421			Ok(true) => {
422				debug!("Lightning send {} already settled; ignoring notification", payment_hash);
423			},
424			Ok(false) => {
425				let lookup = self.lightning_send_checkpoint(payment_hash).await;
426				match lookup {
427					Ok(Some(send)) => {
428						let result = match (&send.progress, known_preimage) {
429							(Progress::PaymentInitiated(htlcs), Some(preimage)) => {
430								let htlcs = htlcs.clone();
431								self.settle_lightning_send_with_preimage(send, htlcs, preimage).await
432							},
433							(Progress::PaymentInitiated(_), None) => {
434								self.drive_action(send, DriveMode::UntilParkOrDone).await
435							},
436							_ => {
437								debug!("Lightning send finished notification for {} but checkpoint is not PaymentInitiated; ignoring", payment_hash);
438								Ok(())
439							},
440						};
441						match result {
442							Ok(()) => info!("Processed lightning send finished for {}", payment_hash),
443							Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
444						}
445					},
446					Ok(None) => {
447						warn!("Lightning send finished notification for unknown payment hash {}", payment_hash);
448					},
449					Err(e) => {
450						error!("Failed to look up lightning send checkpoint for {}: {:#}", payment_hash, e);
451					},
452				}
453			},
454			Err(e) => {
455				error!("Failed to look up paid_invoice for {}: {:#}", payment_hash, e);
456			},
457		}
458
459		self.store_mailbox_checkpoint(checkpoint).await?;
460		Ok(())
461	}
462
463	/// Post vtxo IDs to the server's recovery mailbox
464	pub async fn post_recovery_vtxo_ids(
465		&self,
466		vtxo_ids: impl IntoIterator<Item = VtxoId>,
467	) -> anyhow::Result<()> {
468		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
469		if vtxo_ids.is_empty() {
470			return Ok(());
471		}
472		let nb_vtxos = vtxo_ids.len();
473
474		let (mut srv, _) = self.require_server().await?;
475		let unblinded_id = self.recovery_mailbox_identifier().serialize();
476		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
477
478		srv.mailbox_client.post_recovery_vtxo_ids(req).await
479			.context("error posting recovery vtxo IDs to server")?;
480
481		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
482		Ok(())
483	}
484
485	/// Return the stored mailbox checkpoint — the tip position the wallet
486	/// has consumed up to. After a successful [`Self::sync_mailbox`], this value
487	/// reflects the server's latest advertised tip.
488	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
489		Ok(self.inner.db.get_mailbox_checkpoint().await?)
490	}
491
492	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
493		Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
494	}
495}