Skip to main content

bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31
32
33/// The maximum number of times we will call the fetch mailbox endpoint in one go
34///
35/// We can't trust the server to honestly tell us to keep trying more forever.
36/// A malicious server could send us empty messages or invalid messages and
37/// lock up our resources forever. So we limit the number of times we will fetch.
38/// If a user actually has more messages left, he will have to call sync again.
39///
40/// (Note that currently the server sends 100 messages per fetch, so this would
41/// only happen for users with more than 1000 pending items.)
42const MAX_MAILBOX_REQUEST_BURST: usize = 10;
43
44impl Wallet {
45	/// Get the keypair used for the server mailbox
46	pub fn mailbox_keypair(&self) -> Keypair {
47		self.inner.seed.to_mailbox_keypair()
48	}
49
50	/// Get the keypair used for the server recovery mailbox
51	pub fn recovery_mailbox_keypair(&self) -> Keypair {
52		self.inner.seed.to_recovery_mailbox_keypair()
53	}
54
55	/// Get this wallet's server mailbox ID
56	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
57		let mailbox_kp = self.mailbox_keypair();
58		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
59	}
60
61	/// Get this wallet's server recovery mailbox ID
62	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
63		let mailbox_kp = self.recovery_mailbox_keypair();
64		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
65	}
66
67	/// Create a mailbox authorization that is valid until the given expiry time
68	///
69	/// This authorization can be used by third parties to lookup your mailbox
70	/// with the Ark server.
71	pub fn mailbox_authorization(
72		&self,
73		authorization_expiry: chrono::DateTime<chrono::Local>,
74	) -> MailboxAuthorization {
75		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
76	}
77
78	/// Subscribe to mailbox message stream.
79	///
80	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
81	///
82	/// Returns a stream of mailbox messages.
83	pub async fn subscribe_mailbox_messages(
84		&self,
85		since_checkpoint: Option<u64>,
86	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
87		let (mut srv, _) = self.require_server().await?;
88
89		let checkpoint = if let Some(since) = since_checkpoint {
90			since
91		} else {
92			self.get_mailbox_checkpoint().await?
93		};
94
95		// we just need a short authorization for the stream initialization
96		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
97		let auth = self.mailbox_authorization(expiry);
98		let mailbox_id = auth.mailbox();
99
100		let req = protos::mailbox_server::MailboxRequest {
101			unblinded_id: mailbox_id.serialize(),
102			authorization: Some(auth.serialize()),
103			checkpoint: checkpoint,
104		};
105
106		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
107			let m = m.context("received error on mailbox message stream")?;
108			Ok::<_, anyhow::Error>(m)
109		});
110
111		Ok(stream)
112	}
113
114	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
115	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
116	/// is triggered.
117	///
118	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
119	/// the database.
120	///
121	/// Returns only once the stream is closed.
122	pub async fn subscribe_process_mailbox_messages(
123		&self,
124		since_checkpoint: Option<u64>,
125		shutdown: CancellationToken,
126	) -> anyhow::Result<()> {
127		let mut reconnect_count = 0;
128		const MAX_RECONNECT_ATTEMPTS: usize = 5;
129
130		'outer: loop {
131			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
132			let connected_at = std::time::Instant::now();
133			trace!("Connected to mailbox stream with server");
134
135			loop {
136				futures::select! {
137					message = stream.next().fuse() => {
138						if let Some(message) = message {
139							reconnect_count = 0;
140							let message = message.context("error on mailbox message stream")?;
141							self.process_mailbox_message(message).await;
142						} else if connected_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
143							// Stream lived long enough that this is likely a
144							// normal idle timeout, not a misbehaving server.
145							reconnect_count = 0;
146							info!("Mailbox stream closed after healthy session, reconnecting");
147							continue 'outer;
148						} else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
149							bail!("Mailbox stream dropped by server, giving up to retry later");
150						} else {
151							reconnect_count += 1;
152							warn!("Mailbox stream dropped by server, reconnecting");
153							continue 'outer;
154						}
155					},
156					_ = shutdown.cancelled().fuse() => {
157						info!("Shutdown signal received! Shutting mailbox messages process...");
158						return Ok(());
159					},
160				}
161			}
162		}
163	}
164
165	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
166	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
167		let (mut srv, _) = self.require_server().await?;
168
169		// we should be able to do all our syncing in 10 minutes
170		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
171		let auth = self.mailbox_authorization(expiry);
172		let mailbox_id = auth.mailbox();
173
174		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
175			let checkpoint = self.get_mailbox_checkpoint().await?;
176			let mailbox_req = protos::mailbox_server::MailboxRequest {
177				unblinded_id: mailbox_id.serialize(),
178				authorization: Some(auth.serialize()),
179				checkpoint,
180			};
181
182			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
183				.context("error fetching mailbox")?.into_inner();
184			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
185
186			for mailbox_msg in mailbox_resp.messages {
187				self.process_mailbox_message(mailbox_msg).await;
188			}
189
190			if !mailbox_resp.have_more {
191				break;
192			}
193		}
194
195		Ok(())
196	}
197
198	/// Turn raw byte arrays into VTXOs, then validate them.
199	///
200	/// This function doesn't return a result on purpose,
201	/// because we want to make sure we don't early return on
202	/// the first error. This ensure we process all VTXOs, even
203	/// if some are invalid, and print everything we received.
204	async fn process_raw_vtxos(
205		&self,
206		raw_vtxos: Vec<Vec<u8>>,
207	) -> Vec<Vtxo<Full>> {
208		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
209		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
210
211		for bytes in &raw_vtxos {
212			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
213				Ok(vtxo) => vtxo,
214				Err(e) => {
215					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
216					invalid_vtxos.push(bytes);
217					continue;
218				}
219			};
220
221			if let Err(e) = self.validate_vtxo(&vtxo).await {
222				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
223				invalid_vtxos.push(bytes);
224				continue;
225			}
226
227			valid_vtxos.push(vtxo);
228		}
229
230		// We log all invalid VTXOs to keep track
231		if !invalid_vtxos.is_empty() {
232			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
233		}
234
235		valid_vtxos
236	}
237
238	pub(crate) async fn process_mailbox_message(
239		&self,
240		mailbox_msg: MailboxMessage,
241	) {
242		use protos::mailbox_server::mailbox_message::Message;
243
244		// Each arm returns whether the checkpoint should advance. Only
245		// arkoor returns false on processing error so the server
246		// redelivers and we retry. Every other arm advances regardless,
247		// either because the work is idempotent and re-done on every
248		// wallet sync, or because the message is informational/ignored.
249		let advance = match mailbox_msg.message {
250			Some(Message::Arkoor(msg)) => {
251				match self.process_received_arkoor_package(msg.vtxos).await {
252					Ok(()) => true,
253					Err(e) => {
254						error!("Error processing received arkoor package: {:#}", e);
255						false
256					}
257				}
258			}
259			Some(Message::RoundParticipationCompleted(m)) => {
260				info!("Server informed that round participation is ready, unlock_hash:{:?}",
261					UnlockHash::from_slice(&m.unlock_hash).ok(),
262				);
263				if let Err(e) = self.sync_pending_rounds().await {
264					error!("Error syncing pending rounds: {:#}", e);
265				}
266				true
267			},
268			Some(Message::IncomingLightningPayment(msg)) => {
269				if let Err(e) = self.handle_lightning_receive_notification(msg).await {
270					error!("Error handling lightning receive notification: {:#}", e);
271				}
272				true
273			},
274			Some(Message::RecoveryVtxoIds(_)) => {
275				trace!("Received recovery VTXO IDs, ignoring");
276				true
277			}
278			Some(Message::LightningSendFinished(msg)) => {
279				if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
280					error!("Error handling lightning send finished notification: {:#}", e);
281				}
282				true
283			}
284			None => {
285				warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
286					mailbox_msg.checkpoint);
287				true
288			}
289		};
290
291		if advance {
292			if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
293				error!("Error storing mailbox checkpoint: {:#}", e);
294			}
295		}
296	}
297
298	async fn process_received_arkoor_package(
299		&self,
300		raw_vtxos: Vec<Vec<u8>>,
301	) -> anyhow::Result<()> {
302		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
303
304		let mut new_vtxos = Vec::with_capacity(vtxos.len());
305		for vtxo in &vtxos {
306			// Skip if already in wallet
307			if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
308				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
309				continue;
310			}
311
312			trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
313			new_vtxos.push(vtxo);
314		}
315
316		if new_vtxos.is_empty() {
317			return Ok(());
318		}
319
320		// Redundantly re-register the received vtxos with the server. An
321		// up-to-date sender already does this after cosign, but older
322		// senders may not, so we do it on receive too to make sure the
323		// server has signed_tx rows for our spendable vtxos. Any failure
324		// is logged and swallowed: the receive must still proceed so we
325		// don't lose track of the vtxos locally, and later spends will
326		// retry registration if still needed.
327		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
328			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
329		}
330
331		let balance = vtxos
332			.iter()
333			.map(|vtxo| vtxo.amount()).sum::<Amount>()
334			.to_signed()?;
335		self.store_spendable_vtxos(&vtxos).await?;
336
337		// Build received_on destinations from received VTXOs, aggregated by address
338		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
339		for vtxo in &vtxos {
340			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
341				if let Ok(address) = self.peek_address(index).await {
342					*received_by_address.entry(address).or_default() += vtxo.amount();
343				}
344			}
345		}
346		let received_on: Vec<_> = received_by_address
347			.iter()
348			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
349			.collect();
350
351		let movement_id = self.inner.movements.new_finished_movement(
352			Subsystem::ARKOOR,
353			ArkoorMovement::Receive.to_string(),
354			MovementStatus::Successful,
355			MovementUpdate::new()
356				.produced_vtxos(&vtxos)
357				.intended_and_effective_balance(balance)
358				.received_on(received_on),
359		).await?;
360
361		info!("Received arkoor (movement {}) for {}", movement_id, balance);
362
363		Ok(())
364	}
365
366	/// Handle a lightning receive notification from the mailbox.
367	///
368	/// This is a signal that the server has received a lightning payment for us
369	/// and we should come online to claim it.
370	async fn handle_lightning_receive_notification(
371		&self,
372		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
373	) -> anyhow::Result<()> {
374		let payment_hash = PaymentHash::try_from(notif.payment_hash)
375			.context("invalid payment hash in lightning receive notification")?;
376
377		debug!("Lightning receive notification: payment_hash={}", payment_hash);
378
379		match self.try_claim_lightning_receive(payment_hash, false, None).await {
380			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
381			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
382		}
383
384		Ok(())
385	}
386
387	/// Handle a lightning send finished notification from the mailbox.
388	///
389	/// This notification indicates that the server has completed processing
390	/// a lightning payment we initiated, either successfully or with failure.
391	async fn handle_lightning_send_finished(
392		&self,
393		notif: protos::mailbox_server::LightningSendFinishedMessage,
394		checkpoint: u64,
395	) -> anyhow::Result<()> {
396		let payment_hash = PaymentHash::try_from(notif.payment_hash)
397			.context("invalid payment hash in lightning send finished notification")?;
398
399		let known_preimage = notif.preimage
400			.and_then(|bytes| Preimage::try_from(bytes).ok());
401
402		if known_preimage.is_some() {
403			debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
404		} else {
405			debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
406		}
407
408		// Trigger the payment check flow which will handle success/revocation.
409		// When we already have the preimage, this skips the server RPC.
410		// Errors are logged but not propagated: we always advance the checkpoint
411		// to avoid re-processing the same notification on the next poll.
412		match self.check_lightning_payment_with_preimage(payment_hash, known_preimage).await {
413			Ok(_) => info!("Processed lightning send finished for {}", payment_hash),
414			Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
415		}
416
417		self.store_mailbox_checkpoint(checkpoint).await?;
418		Ok(())
419	}
420
421	/// Post vtxo IDs to the server's recovery mailbox
422	pub async fn post_recovery_vtxo_ids(
423		&self,
424		vtxo_ids: impl IntoIterator<Item = VtxoId>,
425	) -> anyhow::Result<()> {
426		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
427		if vtxo_ids.is_empty() {
428			return Ok(());
429		}
430		let nb_vtxos = vtxo_ids.len();
431
432		let (mut srv, _) = self.require_server().await?;
433		let unblinded_id = self.recovery_mailbox_identifier().serialize();
434		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
435
436		srv.mailbox_client.post_recovery_vtxo_ids(req).await
437			.context("error posting recovery vtxo IDs to server")?;
438
439		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
440		Ok(())
441	}
442
443	/// Return the stored mailbox checkpoint — the tip position the wallet
444	/// has consumed up to. After a successful [`Self::sync_mailbox`], this value
445	/// reflects the server's latest advertised tip.
446	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
447		Ok(self.inner.db.get_mailbox_checkpoint().await?)
448	}
449
450	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
451		Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
452	}
453}