Skip to main content

bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9use std::ops::ControlFlow;
10
11use anyhow::Context;
12use ark::tree::signed::UnlockHash;
13use bitcoin::hashes::Hash;
14use bitcoin::Amount;
15use bitcoin::hex::DisplayHex;
16use bitcoin::secp256k1::Keypair;
17use futures::{FutureExt, Stream, StreamExt};
18use log::{debug, error, info, trace, warn};
19use tokio_util::sync::CancellationToken;
20
21use ark::{ProtocolEncoding, Vtxo, VtxoId};
22use ark::lightning::{PaymentHash, Preimage};
23use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
24use ark::vtxo::Full;
25use server_rpc::protos;
26use server_rpc::protos::mailbox_server::MailboxMessage;
27
28use crate::{Wallet, SUBSCRIBE_REQUEST_TIMEOUT};
29use crate::actions::DriveMode;
30use crate::actions::lightning::pay::Progress;
31use crate::movement::{MovementDestination, MovementStatus};
32use crate::movement::update::MovementUpdate;
33use crate::subsystem::{ArkoorMovement, Subsystem};
34
35
36/// The maximum number of times we will call the fetch mailbox endpoint in one go
37///
38/// We can't trust the server to honestly tell us to keep trying more forever.
39/// A malicious server could send us empty messages or invalid messages and
40/// lock up our resources forever. So we limit the number of times we will fetch.
41/// If a user actually has more messages left, he will have to call sync again.
42///
43/// (Note that currently the server sends 100 messages per fetch, so this would
44/// only happen for users with more than 1000 pending items.)
45const MAX_MAILBOX_REQUEST_BURST: usize = 10;
46
47/// Key for the lock that serializes the arkoor receive dedup within a wallet.
48///
49/// Several consumers of one wallet process the same mailbox messages: the
50/// daemon's always-on stream ([`Wallet::subscribe_process_mailbox_messages`])
51/// runs alongside the startup / periodic `sync()` (which calls
52/// [`Wallet::sync_mailbox`]), and a REST deployment can fire concurrent
53/// `/sync` and `/sync/mailbox` requests. The server hands each consumer the
54/// same messages from its checkpoint, so without serialization the
55/// peek-then-store dedup in arkoor processing races: every consumer that wins
56/// the [`crate::persist::BarkPersister::get_wallet_vtxo`] check before the
57/// others store records its own receive movement, double-counting the receive.
58///
59/// Holding this lock across that check-then-store (in
60/// [`Wallet::process_received_arkoor_package`]) makes it atomic across
61/// consumers. The VTXO row is `INSERT OR IGNORE`, so once one consumer has
62/// stored the package the rest see the VTXO already present and skip the
63/// movement.
64const MAILBOX_PROCESSING_LOCK_KEY: &str = "mailbox.processing";
65
66/// How long a consumer waits for the arkoor dedup lock before giving up.
67///
68/// The critical section is a couple of server round trips plus local DB
69/// writes, normally well under a second. On timeout we error rather than
70/// block, handled like any other arkoor processing error: this message's
71/// checkpoint isn't advanced and processing halts (see
72/// [`Wallet::process_mailbox_message`]) so a later message can't bury it. A
73/// timeout only happens if a holder is stuck for the full duration; normally
74/// waiters acquire the lock and dedup as usual.
75const MAILBOX_PROCESSING_LOCK_TIMEOUT: std::time::Duration =
76	std::time::Duration::from_secs(30);
77
78impl Wallet {
79	/// Get the keypair used for the server mailbox
80	pub fn mailbox_keypair(&self) -> Keypair {
81		self.inner.seed.to_mailbox_keypair()
82	}
83
84	/// Get the keypair used for the server recovery mailbox
85	pub fn recovery_mailbox_keypair(&self) -> Keypair {
86		self.inner.seed.to_recovery_mailbox_keypair()
87	}
88
89	/// Get this wallet's server mailbox ID
90	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
91		let mailbox_kp = self.mailbox_keypair();
92		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
93	}
94
95	/// Get this wallet's server recovery mailbox ID
96	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
97		let mailbox_kp = self.recovery_mailbox_keypair();
98		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
99	}
100
101	/// Create a mailbox authorization that is valid until the given expiry time
102	///
103	/// This authorization can be used by third parties to lookup your mailbox
104	/// with the Ark server.
105	pub fn mailbox_authorization(
106		&self,
107		authorization_expiry: chrono::DateTime<chrono::Local>,
108	) -> MailboxAuthorization {
109		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
110	}
111
112	/// Subscribe to mailbox message stream.
113	///
114	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
115	///
116	/// Returns a stream of mailbox messages.
117	pub async fn subscribe_mailbox_messages(
118		&self,
119		since_checkpoint: Option<u64>,
120	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
121		let (mut srv, _) = self.require_server().await?;
122
123		let checkpoint = if let Some(since) = since_checkpoint {
124			since
125		} else {
126			self.get_mailbox_checkpoint().await?
127		};
128
129		// we just need a short authorization for the stream initialization
130		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
131		let auth = self.mailbox_authorization(expiry);
132		let mailbox_id = auth.mailbox();
133
134		let mut req = tonic::IntoRequest::into_request(protos::mailbox_server::MailboxRequest {
135			mailbox_id: mailbox_id.serialize(),
136			authorization: Some(auth.serialize()),
137			checkpoint: checkpoint,
138		});
139		req.set_timeout(SUBSCRIBE_REQUEST_TIMEOUT);
140
141		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
142			let m = m.context("received error on mailbox message stream")?;
143			Ok::<_, anyhow::Error>(m)
144		});
145
146		Ok(stream)
147	}
148
149	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
150	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
151	/// is triggered.
152	///
153	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
154	/// the database.
155	///
156	/// Returns only once the stream is closed.
157	pub async fn subscribe_process_mailbox_messages(
158		&self,
159		since_checkpoint: Option<u64>,
160		shutdown: CancellationToken,
161	) -> anyhow::Result<()> {
162		let mut reconnect_count = 0;
163		const MAX_RECONNECT_ATTEMPTS: usize = 5;
164
165		'outer: loop {
166			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
167			trace!("Connected to mailbox stream with server");
168
169			loop {
170				futures::select! {
171					message = stream.next().fuse() => {
172						match message {
173							Some(Ok(message)) => {
174								reconnect_count = 0;
175								if self.process_mailbox_message(message).await.is_break() {
176									// A message failed without advancing its
177									// checkpoint. Stop consuming this stream and
178									// resubscribe from the unadvanced checkpoint
179									// so it's redelivered before a later message
180									// can bury it.
181									trace!("Halting mailbox stream after unadvanced message; resubscribing");
182									continue 'outer;
183								}
184							},
185							// A tonic h2 stream reset is almost always a
186							// proxy- or server-side idle timeout rather than
187							// a real failure; resubscribe quietly.
188							Some(Err(e)) if crate::utils::is_h2_stream_error(&e) => {
189								reconnect_count = 0;
190								trace!("Mailbox stream reset by server, reconnecting: {e:#}");
191								continue 'outer;
192							},
193							Some(Err(e)) => {
194								return Err(e).context("error on mailbox message stream");
195							},
196							None if reconnect_count >= MAX_RECONNECT_ATTEMPTS => {
197								bail!("Mailbox stream dropped by server, giving up to retry later");
198							},
199							None => {
200								reconnect_count += 1;
201								warn!("Mailbox stream dropped by server, reconnecting");
202								continue 'outer;
203							},
204						}
205					},
206					_ = shutdown.cancelled().fuse() => {
207						info!("Shutdown signal received! Shutting mailbox messages process...");
208						return Ok(());
209					},
210				}
211			}
212		}
213	}
214
215	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
216	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
217		let (mut srv, _) = self.require_server().await?;
218
219		// we should be able to do all our syncing in 10 minutes
220		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
221		let auth = self.mailbox_authorization(expiry);
222		let mailbox_id = auth.mailbox();
223
224		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
225			let checkpoint = self.get_mailbox_checkpoint().await?;
226			let mailbox_req = protos::mailbox_server::MailboxRequest {
227				mailbox_id: mailbox_id.serialize(),
228				authorization: Some(auth.serialize()),
229				checkpoint,
230			};
231
232			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
233				.context("error fetching mailbox")?.into_inner();
234			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
235
236			for mailbox_msg in mailbox_resp.messages {
237				if self.process_mailbox_message(mailbox_msg).await.is_break() {
238					// A message failed without advancing its checkpoint. Stop
239					// so we don't advance past it; the next sync retries from
240					// the same checkpoint.
241					return Ok(());
242				}
243			}
244
245			if !mailbox_resp.have_more {
246				break;
247			}
248		}
249
250		Ok(())
251	}
252
253	/// Turn raw byte arrays into VTXOs, then validate them.
254	///
255	/// This function doesn't return a result on purpose,
256	/// because we want to make sure we don't early return on
257	/// the first error. This ensure we process all VTXOs, even
258	/// if some are invalid, and print everything we received.
259	async fn process_raw_vtxos(
260		&self,
261		raw_vtxos: Vec<Vec<u8>>,
262	) -> Vec<Vtxo<Full>> {
263		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
264		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
265
266		for bytes in &raw_vtxos {
267			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
268				Ok(vtxo) => vtxo,
269				Err(e) => {
270					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
271					invalid_vtxos.push(bytes);
272					continue;
273				}
274			};
275
276			if let Err(e) = self.validate_vtxo(&vtxo).await {
277				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
278				invalid_vtxos.push(bytes);
279				continue;
280			}
281
282			valid_vtxos.push(vtxo);
283		}
284
285		// We log all invalid VTXOs to keep track
286		if !invalid_vtxos.is_empty() {
287			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
288		}
289
290		valid_vtxos
291	}
292
293	/// Process a single mailbox message and report whether the caller should
294	/// keep consuming the mailbox.
295	///
296	/// Returns [`ControlFlow::Break`] when an arkoor package failed to process
297	/// and its checkpoint was therefore not advanced. Because checkpoints are
298	/// monotonic, the caller must stop before a later message stores a higher
299	/// checkpoint and buries the unprocessed one; the next sync/resubscribe
300	/// re-fetches from the unadvanced checkpoint and retries.
301	pub(crate) async fn process_mailbox_message(
302		&self,
303		mailbox_msg: MailboxMessage,
304	) -> ControlFlow<()> {
305		use protos::mailbox_server::mailbox_message::Message;
306
307		// Each arm returns whether the checkpoint should advance. Only
308		// arkoor returns false on processing error so the server
309		// redelivers and we retry. Every other arm advances regardless,
310		// either because the work is idempotent and re-done on every
311		// wallet sync, or because the message is informational/ignored.
312		let advance = match mailbox_msg.message {
313			Some(Message::Arkoor(msg)) => {
314				match self.process_received_arkoor_package(msg.vtxos).await {
315					Ok(()) => true,
316					Err(e) => {
317						error!("Error processing received arkoor package: {:#}", e);
318						false
319					}
320				}
321			}
322			Some(Message::RoundParticipationCompleted(m)) => {
323				info!("Server informed that round participation is ready, unlock_hash:{:?}",
324					UnlockHash::from_slice(&m.unlock_hash).ok(),
325				);
326				if let Err(e) = self.sync_pending_rounds().await {
327					error!("Error syncing pending rounds: {:#}", e);
328				}
329				true
330			},
331			Some(Message::IncomingLightningPayment(msg)) => {
332				if let Err(e) = self.handle_lightning_receive_notification(msg).await {
333					error!("Error handling lightning receive notification: {:#}", e);
334				}
335				true
336			},
337			Some(Message::RecoveryVtxoIds(_)) => {
338				trace!("Received recovery VTXO IDs, ignoring");
339				true
340			}
341			Some(Message::LightningSendFinished(msg)) => {
342				if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
343					error!("Error handling lightning send finished notification: {:#}", e);
344				}
345				true
346			}
347			None => {
348				warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
349					mailbox_msg.checkpoint);
350				true
351			}
352		};
353
354		if advance {
355			if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
356				error!("Error storing mailbox checkpoint: {:#}", e);
357			}
358			ControlFlow::Continue(())
359		} else {
360			// An arkoor package didn't process and its checkpoint wasn't
361			// advanced. Stop here so a later message can't store a higher
362			// checkpoint and bury it.
363			ControlFlow::Break(())
364		}
365	}
366
367	async fn process_received_arkoor_package(
368		&self,
369		raw_vtxos: Vec<Vec<u8>>,
370	) -> anyhow::Result<()> {
371		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
372
373		// Serialize the receive dedup across all consumers of this wallet's
374		// mailbox so two of them can't both record a movement for the same
375		// package. See MAILBOX_PROCESSING_LOCK_KEY. On lock failure we
376		// return like any other arkoor processing error, leaving this
377		// message's checkpoint unadvanced.
378		let _guard = self.inner.lock_manager.lock(
379			MAILBOX_PROCESSING_LOCK_KEY, MAILBOX_PROCESSING_LOCK_TIMEOUT,
380		).await.context("failed to acquire mailbox processing lock")?;
381
382		let mut new_vtxos = Vec::with_capacity(vtxos.len());
383		for vtxo in &vtxos {
384			// Skip if already in wallet
385			if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
386				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
387				continue;
388			}
389
390			trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
391			new_vtxos.push(vtxo);
392		}
393
394		if new_vtxos.is_empty() {
395			return Ok(());
396		}
397
398		// Redundantly re-register the received vtxos with the server. An
399		// up-to-date sender already does this after cosign, but older
400		// senders may not, so we do it on receive too to make sure the
401		// server has signed_tx rows for our spendable vtxos. Any failure
402		// is logged and swallowed: the receive must still proceed so we
403		// don't lose track of the vtxos locally, and later spends will
404		// retry registration if still needed.
405		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
406			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
407		}
408
409		let balance = vtxos
410			.iter()
411			.map(|vtxo| vtxo.amount()).sum::<Amount>()
412			.to_signed()?;
413		self.store_spendable_vtxos(&vtxos).await?;
414
415		// Build received_on destinations from received VTXOs, aggregated by address
416		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
417		for vtxo in &vtxos {
418			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
419				if let Ok(address) = self.peek_address(index).await {
420					*received_by_address.entry(address).or_default() += vtxo.amount();
421				}
422			}
423		}
424		let received_on: Vec<_> = received_by_address
425			.iter()
426			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
427			.collect();
428
429		let movement_id = self.inner.movements.new_finished_movement(
430			Subsystem::ARKOOR,
431			ArkoorMovement::Receive.to_string(),
432			MovementStatus::Successful,
433			MovementUpdate::new()
434				.produced_vtxos(&vtxos)
435				.intended_and_effective_balance(balance)
436				.received_on(received_on),
437		).await?;
438
439		info!("Received arkoor (movement {}) for {}", movement_id, balance);
440
441		Ok(())
442	}
443
444	/// Handle a lightning receive notification from the mailbox.
445	///
446	/// This is a signal that the server has received a lightning payment for us
447	/// and we should come online to claim it.
448	async fn handle_lightning_receive_notification(
449		&self,
450		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
451	) -> anyhow::Result<()> {
452		let payment_hash = PaymentHash::try_from(notif.payment_hash)
453			.context("invalid payment hash in lightning receive notification")?;
454
455		debug!("Lightning receive notification: payment_hash={}", payment_hash);
456
457		match self.try_claim_lightning_receive(payment_hash, false, None).await {
458			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
459			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
460		}
461
462		Ok(())
463	}
464
465	/// Handle a lightning send finished notification from the mailbox.
466	///
467	/// This notification indicates that the server has completed processing
468	/// a lightning payment we initiated, either successfully or with failure.
469	async fn handle_lightning_send_finished(
470		&self,
471		notif: protos::mailbox_server::LightningSendFinishedMessage,
472		checkpoint: u64,
473	) -> anyhow::Result<()> {
474		let payment_hash = PaymentHash::try_from(notif.payment_hash)
475			.context("invalid payment hash in lightning send finished notification")?;
476
477		let known_preimage = notif.preimage
478			.and_then(|bytes| Preimage::try_from(bytes).ok());
479
480		if known_preimage.is_some() {
481			debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
482		} else {
483			debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
484		}
485
486		// Errors are logged but not propagated: we always advance the
487		// mailbox checkpoint to avoid re-processing the same
488		// notification on the next poll.
489		match self.is_invoice_paid(payment_hash).await {
490			Ok(true) => {
491				debug!("Lightning send {} already settled; ignoring notification", payment_hash);
492			},
493			Ok(false) => {
494				let lookup = self.lightning_send_checkpoint(payment_hash).await;
495				match lookup {
496					Ok(Some(send)) => {
497						let result = match (&send.progress, known_preimage) {
498							(Progress::PaymentInitiated(htlcs), Some(preimage)) => {
499								let htlcs = htlcs.clone();
500								self.settle_lightning_send_with_preimage(send, htlcs, preimage).await
501							},
502							(Progress::PaymentInitiated(_), None) => {
503								self.drive_action(send, DriveMode::UntilParkOrDone).await
504							},
505							_ => {
506								debug!("Lightning send finished notification for {} but checkpoint is not PaymentInitiated; ignoring", payment_hash);
507								Ok(())
508							},
509						};
510						match result {
511							Ok(()) => info!("Processed lightning send finished for {}", payment_hash),
512							Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
513						}
514					},
515					Ok(None) => {
516						warn!("Lightning send finished notification for unknown payment hash {}", payment_hash);
517					},
518					Err(e) => {
519						error!("Failed to look up lightning send checkpoint for {}: {:#}", payment_hash, e);
520					},
521				}
522			},
523			Err(e) => {
524				error!("Failed to look up paid_invoice for {}: {:#}", payment_hash, e);
525			},
526		}
527
528		self.store_mailbox_checkpoint(checkpoint).await?;
529		Ok(())
530	}
531
532	/// Post vtxo IDs to the server's recovery mailbox
533	pub async fn post_recovery_vtxo_ids(
534		&self,
535		vtxo_ids: impl IntoIterator<Item = VtxoId>,
536	) -> anyhow::Result<()> {
537		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
538		if vtxo_ids.is_empty() {
539			return Ok(());
540		}
541		let nb_vtxos = vtxo_ids.len();
542
543		let (mut srv, _) = self.require_server().await?;
544		let mailbox_id = self.recovery_mailbox_identifier().serialize();
545		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { mailbox_id, vtxo_ids };
546
547		srv.mailbox_client.post_recovery_vtxo_ids(req).await
548			.context("error posting recovery vtxo IDs to server")?;
549
550		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
551		Ok(())
552	}
553
554	/// Return the stored mailbox checkpoint — the tip position the wallet
555	/// has consumed up to. After a successful [`Self::sync_mailbox`], this value
556	/// reflects the server's latest advertised tip.
557	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
558		Ok(self.inner.db.get_mailbox_checkpoint().await?)
559	}
560
561	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
562		Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
563	}
564}