Skip to main content

bark/
mailbox.rs

1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use bitcoin::Amount;
12use bitcoin::hex::DisplayHex;
13use bitcoin::secp256k1::Keypair;
14use futures::{Stream, StreamExt};
15use log::{debug, error, info, trace, warn};
16
17use ark::{ProtocolEncoding, Vtxo, VtxoId};
18use ark::lightning::PaymentHash;
19use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
20use ark::vtxo::Full;
21use server_rpc::protos;
22use server_rpc::protos::mailbox_server::MailboxMessage;
23
24use crate::Wallet;
25use crate::movement::{MovementDestination, MovementStatus};
26use crate::movement::update::MovementUpdate;
27use crate::subsystem::{ArkoorMovement, Subsystem};
28
29
30/// The maximum number of times we will call the fetch mailbox endpoint in one go
31///
32/// We can't trust the server to honestly tell us to keep trying more forever.
33/// A malicious server could send us empty messages or invalid messages and
34/// lock up our resources forever. So we limit the number of times we will fetch.
35/// If a user actually has more messages left, he will have to call sync again.
36///
37/// (Note that currently the server sends 100 messages per fetch, so this would
38/// only happen for users with more than 1000 pending items.)
39const MAX_MAILBOX_REQUEST_BURST: usize = 10;
40
41impl Wallet {
42	/// Get the keypair used for the server mailbox
43	pub fn mailbox_keypair(&self) -> Keypair {
44		self.seed.to_mailbox_keypair()
45	}
46
47	/// Get the keypair used for the server recovery mailbox
48	pub fn recovery_mailbox_keypair(&self) -> Keypair {
49		self.seed.to_recovery_mailbox_keypair()
50	}
51
52	/// Get this wallet's server mailbox ID
53	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
54		let mailbox_kp = self.mailbox_keypair();
55		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
56	}
57
58	/// Get this wallet's server recovery mailbox ID
59	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
60		let mailbox_kp = self.recovery_mailbox_keypair();
61		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
62	}
63
64	/// Create a mailbox authorization that is valid until the given expiry time
65	///
66	/// This authorization can be used by third parties to lookup your mailbox
67	/// with the Ark server.
68	pub fn mailbox_authorization(
69		&self,
70		authorization_expiry: chrono::DateTime<chrono::Local>,
71	) -> MailboxAuthorization {
72		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
73	}
74
75	/// Subscribe to mailbox message stream.
76	///
77	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
78	///
79	/// Returns a stream of mailbox messages.
80	pub async fn subscribe_mailbox_messages(
81		&self,
82		since_checkpoint: Option<u64>,
83	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
84		let (mut srv, _) = self.require_server().await?;
85
86		let checkpoint = if let Some(since) = since_checkpoint {
87			since
88		} else {
89			self.get_mailbox_checkpoint().await?
90		};
91
92		// we just need a short authorization for the stream initialization
93		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
94		let auth = self.mailbox_authorization(expiry);
95		let mailbox_id = auth.mailbox();
96
97		let req = protos::mailbox_server::MailboxRequest {
98			unblinded_id: mailbox_id.to_vec(),
99			authorization: Some(auth.serialize()),
100			checkpoint: checkpoint,
101		};
102
103		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
104			let m = m.context("received error on mailbox message stream")?;
105			Ok::<_, anyhow::Error>(m)
106		});
107
108		Ok(stream)
109	}
110
111	/// Subscribe to mailbox message stream and process each incoming message.
112	///
113	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
114	///
115	/// Returns only once the stream is closed.
116	pub async fn subscribe_process_mailbox_messages(
117		&self,
118		since_checkpoint: Option<u64>,
119	) -> anyhow::Result<()> {
120		let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
121		while let Some(message) = stream.next().await {
122			let message = if let Ok(message) = message {
123				message
124			} else {
125				error!("Error receiving mailbox message: {:#}", message.unwrap_err());
126				continue;
127			};
128
129			self.process_mailbox_message(message).await;
130		}
131
132		Ok(())
133	}
134
135	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
136	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
137		let (mut srv, _) = self.require_server().await?;
138
139		// we should be able to do all our syncing in 10 minutes
140		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
141		let auth = self.mailbox_authorization(expiry);
142		let mailbox_id = auth.mailbox();
143
144		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
145			let checkpoint = self.get_mailbox_checkpoint().await?;
146			let mailbox_req = protos::mailbox_server::MailboxRequest {
147				unblinded_id: mailbox_id.to_vec(),
148				authorization: Some(auth.serialize()),
149				checkpoint,
150			};
151
152			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
153				.context("error fetching mailbox")?.into_inner();
154			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
155
156			for mailbox_msg in mailbox_resp.messages {
157				self.process_mailbox_message(mailbox_msg).await;
158			}
159
160			if !mailbox_resp.have_more {
161				break;
162			}
163		}
164
165		Ok(())
166	}
167
168	/// Turn raw byte arrays into VTXOs, then validate them.
169	///
170	/// This function doesn't return a result on purpose,
171	/// because we want to make sure we don't early return on
172	/// the first error. This ensure we process all VTXOs, even
173	/// if some are invalid, and print everything we received.
174	async fn process_raw_vtxos(
175		&self,
176		raw_vtxos: Vec<Vec<u8>>,
177	) -> Vec<Vtxo<Full>> {
178		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
179		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
180
181		for bytes in &raw_vtxos {
182			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
183				Ok(vtxo) => vtxo,
184				Err(e) => {
185					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
186					invalid_vtxos.push(bytes);
187					continue;
188				}
189			};
190
191			if let Err(e) = self.validate_vtxo(&vtxo).await {
192				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
193				invalid_vtxos.push(bytes);
194				continue;
195			}
196
197			valid_vtxos.push(vtxo);
198		}
199
200		// We log all invalid VTXOs to keep track
201		if !invalid_vtxos.is_empty() {
202			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
203		}
204
205		valid_vtxos
206	}
207
208	async fn process_mailbox_message(
209		&self,
210		mailbox_msg: MailboxMessage,
211	) {
212		match mailbox_msg.message {
213			Some(protos::mailbox_server::mailbox_message::Message::Arkoor(msg)) => {
214				let result = self
215					.process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
216				if let Err(e) = result {
217					error!("Error processing received arkoor package: {:#}", e);
218				}
219			}
220			Some(protos::mailbox_server::mailbox_message::Message::RoundParticipationCompleted(_)) => {
221				// Do we want to do custom code paths for progressing the round participations
222				// via the payment hashes returnded by the server?
223				if let Err(e) = self.sync_pending_rounds().await {
224					error!("Error syncing pending rounds: {:#}", e);
225				}
226			},
227			Some(protos::mailbox_server::mailbox_message::Message::IncomingLightningPayment(msg)) => {
228				if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
229					error!("Error handling lightning receive notification: {:#}", e);
230				}
231			},
232			Some(protos::mailbox_server::mailbox_message::Message::RecoveryVtxoIds(_)) => {
233				trace!("Received recovery VTXO IDs, ignoring");
234			}
235			None => {
236				warn!("Received unknown mailbox message, ignoring");
237			}
238		}
239	}
240
241	async fn process_received_arkoor_package(
242		&self,
243		raw_vtxos: Vec<Vec<u8>>,
244		checkpoint: Option<u64>,
245	) -> anyhow::Result<()> {
246		let vtxos = self.process_raw_vtxos(raw_vtxos).await;
247
248		let mut new_vtxos = Vec::with_capacity(vtxos.len());
249		for vtxo in &vtxos {
250			// Skip if already in wallet
251			if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
252				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
253				continue;
254			}
255
256			trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
257			new_vtxos.push(vtxo);
258		}
259
260		if new_vtxos.is_empty() {
261			return Ok(());
262		}
263
264		let balance = vtxos
265			.iter()
266			.map(|vtxo| vtxo.amount()).sum::<Amount>()
267			.to_signed()?;
268		self.store_spendable_vtxos(&vtxos).await?;
269
270		// Build received_on destinations from received VTXOs, aggregated by address
271		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
272		for vtxo in &vtxos {
273			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
274				if let Ok(address) = self.peek_address(index).await {
275					*received_by_address.entry(address).or_default() += vtxo.amount();
276				}
277			}
278		}
279		let received_on: Vec<_> = received_by_address
280			.iter()
281			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
282			.collect();
283
284		let movement_id = self.movements.new_finished_movement(
285			Subsystem::ARKOOR,
286			ArkoorMovement::Receive.to_string(),
287			MovementStatus::Successful,
288			MovementUpdate::new()
289				.produced_vtxos(&vtxos)
290				.intended_and_effective_balance(balance)
291				.received_on(received_on),
292		).await?;
293
294		info!("Received arkoor (movement {}) for {}", movement_id, balance);
295
296		if let Some(checkpoint) = checkpoint {
297			self.store_mailbox_checkpoint(checkpoint).await?;
298		}
299
300		Ok(())
301	}
302
303	/// Handle a lightning receive notification from the mailbox.
304	///
305	/// This is a signal that the server has received a lightning payment for us
306	/// and we should come online to claim it.
307	async fn handle_lightning_receive_notification(
308		&self,
309		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
310		checkpoint: u64,
311	) -> anyhow::Result<()> {
312		let payment_hash = PaymentHash::try_from(notif.payment_hash)
313			.context("invalid payment hash in lightning receive notification")?;
314
315		debug!("Lightning receive notification: payment_hash={}", payment_hash);
316
317		match self.try_claim_lightning_receive(payment_hash, false, None).await {
318			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
319			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
320		}
321
322		self.store_mailbox_checkpoint(checkpoint).await?;
323		Ok(())
324	}
325
326	/// Post vtxo IDs to the server's recovery mailbox
327	pub async fn post_recovery_vtxo_ids(
328		&self,
329		vtxo_ids: impl IntoIterator<Item = VtxoId>,
330	) -> anyhow::Result<()> {
331		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
332		if vtxo_ids.is_empty() {
333			return Ok(());
334		}
335		let nb_vtxos = vtxo_ids.len();
336
337		let (mut srv, _) = self.require_server().await?;
338		let unblinded_id = self.recovery_mailbox_identifier().to_vec();
339		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
340
341		srv.mailbox_client.post_recovery_vtxo_ids(req).await
342			.context("error posting recovery vtxo IDs to server")?;
343
344		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
345		Ok(())
346	}
347
348	async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
349		Ok(self.db.get_mailbox_checkpoint().await?)
350	}
351
352	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
353		Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
354	}
355}