Skip to main content

bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31use crate::utils::time::Instant;
32
33
34/// The maximum number of times we will call the fetch mailbox endpoint in one go
35///
36/// We can't trust the server to honestly tell us to keep trying more forever.
37/// A malicious server could send us empty messages or invalid messages and
38/// lock up our resources forever. So we limit the number of times we will fetch.
39/// If a user actually has more messages left, he will have to call sync again.
40///
41/// (Note that currently the server sends 100 messages per fetch, so this would
42/// only happen for users with more than 1000 pending items.)
43const MAX_MAILBOX_REQUEST_BURST: usize = 10;
44
45impl Wallet {
46	/// Get the keypair used for the server mailbox
47	pub fn mailbox_keypair(&self) -> Keypair {
48		self.inner.seed.to_mailbox_keypair()
49	}
50
51	/// Get the keypair used for the server recovery mailbox
52	pub fn recovery_mailbox_keypair(&self) -> Keypair {
53		self.inner.seed.to_recovery_mailbox_keypair()
54	}
55
56	/// Get this wallet's server mailbox ID
57	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
58		let mailbox_kp = self.mailbox_keypair();
59		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
60	}
61
62	/// Get this wallet's server recovery mailbox ID
63	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
64		let mailbox_kp = self.recovery_mailbox_keypair();
65		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
66	}
67
68	/// Create a mailbox authorization that is valid until the given expiry time
69	///
70	/// This authorization can be used by third parties to lookup your mailbox
71	/// with the Ark server.
72	pub fn mailbox_authorization(
73		&self,
74		authorization_expiry: chrono::DateTime<chrono::Local>,
75	) -> MailboxAuthorization {
76		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
77	}
78
79	/// Subscribe to mailbox message stream.
80	///
81	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
82	///
83	/// Returns a stream of mailbox messages.
84	pub async fn subscribe_mailbox_messages(
85		&self,
86		since_checkpoint: Option<u64>,
87	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
88		let (mut srv, _) = self.require_server().await?;
89
90		let checkpoint = if let Some(since) = since_checkpoint {
91			since
92		} else {
93			self.get_mailbox_checkpoint().await?
94		};
95
96		// we just need a short authorization for the stream initialization
97		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
98		let auth = self.mailbox_authorization(expiry);
99		let mailbox_id = auth.mailbox();
100
101		let req = protos::mailbox_server::MailboxRequest {
102			unblinded_id: mailbox_id.serialize(),
103			authorization: Some(auth.serialize()),
104			checkpoint: checkpoint,
105		};
106
107		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
108			let m = m.context("received error on mailbox message stream")?;
109			Ok::<_, anyhow::Error>(m)
110		});
111
112		Ok(stream)
113	}
114
115	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
116	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
117	/// is triggered.
118	///
119	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
120	/// the database.
121	///
122	/// Returns only once the stream is closed.
123	pub async fn subscribe_process_mailbox_messages(
124		&self,
125		since_checkpoint: Option<u64>,
126		shutdown: CancellationToken,
127	) -> anyhow::Result<()> {
128		let mut reconnect_count = 0;
129		const MAX_RECONNECT_ATTEMPTS: usize = 5;
130
131		'outer: loop {
132			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
133			let connected_at = Instant::now();
134			trace!("Connected to mailbox stream with server");
135
136			loop {
137				futures::select! {
138					message = stream.next().fuse() => {
139						if let Some(message) = message {
140							reconnect_count = 0;
141							let message = message.context("error on mailbox message stream")?;
142							self.process_mailbox_message(message).await;
143						} else if connected_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
144							// Stream lived long enough that this is likely a
145							// normal idle timeout, not a misbehaving server.
146							reconnect_count = 0;
147							info!("Mailbox stream closed after healthy session, reconnecting");
148							continue 'outer;
149						} else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
150							bail!("Mailbox stream dropped by server, giving up to retry later");
151						} else {
152							reconnect_count += 1;
153							warn!("Mailbox stream dropped by server, reconnecting");
154							continue 'outer;
155						}
156					},
157					_ = shutdown.cancelled().fuse() => {
158						info!("Shutdown signal received! Shutting mailbox messages process...");
159						return Ok(());
160					},
161				}
162			}
163		}
164	}
165
166	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
167	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
168		let (mut srv, _) = self.require_server().await?;
169
170		// we should be able to do all our syncing in 10 minutes
171		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
172		let auth = self.mailbox_authorization(expiry);
173		let mailbox_id = auth.mailbox();
174
175		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
176			let checkpoint = self.get_mailbox_checkpoint().await?;
177			let mailbox_req = protos::mailbox_server::MailboxRequest {
178				unblinded_id: mailbox_id.serialize(),
179				authorization: Some(auth.serialize()),
180				checkpoint,
181			};
182
183			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
184				.context("error fetching mailbox")?.into_inner();
185			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
186
187			for mailbox_msg in mailbox_resp.messages {
188				self.process_mailbox_message(mailbox_msg).await;
189			}
190
191			if !mailbox_resp.have_more {
192				break;
193			}
194		}
195
196		Ok(())
197	}
198
199	/// Turn raw byte arrays into VTXOs, then validate them.
200	///
201	/// This function doesn't return a result on purpose,
202	/// because we want to make sure we don't early return on
203	/// the first error. This ensure we process all VTXOs, even
204	/// if some are invalid, and print everything we received.
205	async fn process_raw_vtxos(
206		&self,
207		raw_vtxos: Vec<Vec<u8>>,
208	) -> Vec<Vtxo<Full>> {
209		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
210		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
211
212		for bytes in &raw_vtxos {
213			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
214				Ok(vtxo) => vtxo,
215				Err(e) => {
216					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
217					invalid_vtxos.push(bytes);
218					continue;
219				}
220			};
221
222			if let Err(e) = self.validate_vtxo(&vtxo).await {
223				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
224				invalid_vtxos.push(bytes);
225				continue;
226			}
227
228			valid_vtxos.push(vtxo);
229		}
230
231		// We log all invalid VTXOs to keep track
232		if !invalid_vtxos.is_empty() {
233			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
234		}
235
236		valid_vtxos
237	}
238
239	pub(crate) async fn process_mailbox_message(
240		&self,
241		mailbox_msg: MailboxMessage,
242	) {
243		use protos::mailbox_server::mailbox_message::Message;
244
245		// Each arm returns whether the checkpoint should advance. Only
246		// arkoor returns false on processing error so the server
247		// redelivers and we retry. Every other arm advances regardless,
248		// either because the work is idempotent and re-done on every
249		// wallet sync, or because the message is informational/ignored.
250		let advance = match mailbox_msg.message {
251			Some(Message::Arkoor(msg)) => {
252				match self.process_received_arkoor_package(msg.vtxos).await {
253					Ok(()) => true,
254					Err(e) => {
255						error!("Error processing received arkoor package: {:#}", e);
256						false
257					}
258				}
259			}
260			Some(Message::RoundParticipationCompleted(m)) => {
261				info!("Server informed that round participation is ready, unlock_hash:{:?}",
262					UnlockHash::from_slice(&m.unlock_hash).ok(),
263				);
264				if let Err(e) = self.sync_pending_rounds().await {
265					error!("Error syncing pending rounds: {:#}", e);
266				}
267				true
268			},
269			Some(Message::IncomingLightningPayment(msg)) => {
270				if let Err(e) = self.handle_lightning_receive_notification(msg).await {
271					error!("Error handling lightning receive notification: {:#}", e);
272				}
273				true
274			},
275			Some(Message::RecoveryVtxoIds(_)) => {
276				trace!("Received recovery VTXO IDs, ignoring");
277				true
278			}
279			Some(Message::LightningSendFinished(msg)) => {
280				if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
281					error!("Error handling lightning send finished notification: {:#}", e);
282				}
283				true
284			}
285			None => {
286				warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
287					mailbox_msg.checkpoint);
288				true
289			}
290		};
291
292		if advance {
293			if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
294				error!("Error storing mailbox checkpoint: {:#}", e);
295			}
296		}
297	}
298
299	async fn process_received_arkoor_package(
300		&self,
301		raw_vtxos: Vec<Vec<u8>>,
302	) -> anyhow::Result<()> {
303		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
304
305		let mut new_vtxos = Vec::with_capacity(vtxos.len());
306		for vtxo in &vtxos {
307			// Skip if already in wallet
308			if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
309				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
310				continue;
311			}
312
313			trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
314			new_vtxos.push(vtxo);
315		}
316
317		if new_vtxos.is_empty() {
318			return Ok(());
319		}
320
321		// Redundantly re-register the received vtxos with the server. An
322		// up-to-date sender already does this after cosign, but older
323		// senders may not, so we do it on receive too to make sure the
324		// server has signed_tx rows for our spendable vtxos. Any failure
325		// is logged and swallowed: the receive must still proceed so we
326		// don't lose track of the vtxos locally, and later spends will
327		// retry registration if still needed.
328		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
329			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
330		}
331
332		let balance = vtxos
333			.iter()
334			.map(|vtxo| vtxo.amount()).sum::<Amount>()
335			.to_signed()?;
336		self.store_spendable_vtxos(&vtxos).await?;
337
338		// Build received_on destinations from received VTXOs, aggregated by address
339		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
340		for vtxo in &vtxos {
341			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
342				if let Ok(address) = self.peek_address(index).await {
343					*received_by_address.entry(address).or_default() += vtxo.amount();
344				}
345			}
346		}
347		let received_on: Vec<_> = received_by_address
348			.iter()
349			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
350			.collect();
351
352		let movement_id = self.inner.movements.new_finished_movement(
353			Subsystem::ARKOOR,
354			ArkoorMovement::Receive.to_string(),
355			MovementStatus::Successful,
356			MovementUpdate::new()
357				.produced_vtxos(&vtxos)
358				.intended_and_effective_balance(balance)
359				.received_on(received_on),
360		).await?;
361
362		info!("Received arkoor (movement {}) for {}", movement_id, balance);
363
364		Ok(())
365	}
366
367	/// Handle a lightning receive notification from the mailbox.
368	///
369	/// This is a signal that the server has received a lightning payment for us
370	/// and we should come online to claim it.
371	async fn handle_lightning_receive_notification(
372		&self,
373		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
374	) -> anyhow::Result<()> {
375		let payment_hash = PaymentHash::try_from(notif.payment_hash)
376			.context("invalid payment hash in lightning receive notification")?;
377
378		debug!("Lightning receive notification: payment_hash={}", payment_hash);
379
380		match self.try_claim_lightning_receive(payment_hash, false, None).await {
381			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
382			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
383		}
384
385		Ok(())
386	}
387
388	/// Handle a lightning send finished notification from the mailbox.
389	///
390	/// This notification indicates that the server has completed processing
391	/// a lightning payment we initiated, either successfully or with failure.
392	async fn handle_lightning_send_finished(
393		&self,
394		notif: protos::mailbox_server::LightningSendFinishedMessage,
395		checkpoint: u64,
396	) -> anyhow::Result<()> {
397		let payment_hash = PaymentHash::try_from(notif.payment_hash)
398			.context("invalid payment hash in lightning send finished notification")?;
399
400		let known_preimage = notif.preimage
401			.and_then(|bytes| Preimage::try_from(bytes).ok());
402
403		if known_preimage.is_some() {
404			debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
405		} else {
406			debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
407		}
408
409		// Trigger the payment check flow which will handle success/revocation.
410		// When we already have the preimage, this skips the server RPC.
411		// Errors are logged but not propagated: we always advance the checkpoint
412		// to avoid re-processing the same notification on the next poll.
413		match self.check_lightning_payment_with_preimage(payment_hash, known_preimage).await {
414			Ok(_) => info!("Processed lightning send finished for {}", payment_hash),
415			Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
416		}
417
418		self.store_mailbox_checkpoint(checkpoint).await?;
419		Ok(())
420	}
421
422	/// Post vtxo IDs to the server's recovery mailbox
423	pub async fn post_recovery_vtxo_ids(
424		&self,
425		vtxo_ids: impl IntoIterator<Item = VtxoId>,
426	) -> anyhow::Result<()> {
427		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
428		if vtxo_ids.is_empty() {
429			return Ok(());
430		}
431		let nb_vtxos = vtxo_ids.len();
432
433		let (mut srv, _) = self.require_server().await?;
434		let unblinded_id = self.recovery_mailbox_identifier().serialize();
435		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
436
437		srv.mailbox_client.post_recovery_vtxo_ids(req).await
438			.context("error posting recovery vtxo IDs to server")?;
439
440		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
441		Ok(())
442	}
443
444	/// Return the stored mailbox checkpoint — the tip position the wallet
445	/// has consumed up to. After a successful [`Self::sync_mailbox`], this value
446	/// reflects the server's latest advertised tip.
447	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
448		Ok(self.inner.db.get_mailbox_checkpoint().await?)
449	}
450
451	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
452		Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
453	}
454}