1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31
32
33const MAX_MAILBOX_REQUEST_BURST: usize = 10;
43
44impl Wallet {
45 pub fn mailbox_keypair(&self) -> Keypair {
47 self.inner.seed.to_mailbox_keypair()
48 }
49
50 pub fn recovery_mailbox_keypair(&self) -> Keypair {
52 self.inner.seed.to_recovery_mailbox_keypair()
53 }
54
55 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
57 let mailbox_kp = self.mailbox_keypair();
58 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
59 }
60
61 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
63 let mailbox_kp = self.recovery_mailbox_keypair();
64 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
65 }
66
67 pub fn mailbox_authorization(
72 &self,
73 authorization_expiry: chrono::DateTime<chrono::Local>,
74 ) -> MailboxAuthorization {
75 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
76 }
77
78 pub async fn subscribe_mailbox_messages(
84 &self,
85 since_checkpoint: Option<u64>,
86 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
87 let (mut srv, _) = self.require_server().await?;
88
89 let checkpoint = if let Some(since) = since_checkpoint {
90 since
91 } else {
92 self.get_mailbox_checkpoint().await?
93 };
94
95 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
97 let auth = self.mailbox_authorization(expiry);
98 let mailbox_id = auth.mailbox();
99
100 let req = protos::mailbox_server::MailboxRequest {
101 unblinded_id: mailbox_id.serialize(),
102 authorization: Some(auth.serialize()),
103 checkpoint: checkpoint,
104 };
105
106 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
107 let m = m.context("received error on mailbox message stream")?;
108 Ok::<_, anyhow::Error>(m)
109 });
110
111 Ok(stream)
112 }
113
114 pub async fn subscribe_process_mailbox_messages(
123 &self,
124 since_checkpoint: Option<u64>,
125 shutdown: CancellationToken,
126 ) -> anyhow::Result<()> {
127 let mut reconnect_count = 0;
128 const MAX_RECONNECT_ATTEMPTS: usize = 5;
129
130 'outer: loop {
131 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
132 let connected_at = std::time::Instant::now();
133 trace!("Connected to mailbox stream with server");
134
135 loop {
136 futures::select! {
137 message = stream.next().fuse() => {
138 if let Some(message) = message {
139 reconnect_count = 0;
140 let message = message.context("error on mailbox message stream")?;
141 self.process_mailbox_message(message).await;
142 } else if connected_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
143 reconnect_count = 0;
146 info!("Mailbox stream closed after healthy session, reconnecting");
147 continue 'outer;
148 } else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
149 bail!("Mailbox stream dropped by server, giving up to retry later");
150 } else {
151 reconnect_count += 1;
152 warn!("Mailbox stream dropped by server, reconnecting");
153 continue 'outer;
154 }
155 },
156 _ = shutdown.cancelled().fuse() => {
157 info!("Shutdown signal received! Shutting mailbox messages process...");
158 return Ok(());
159 },
160 }
161 }
162 }
163 }
164
165 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
167 let (mut srv, _) = self.require_server().await?;
168
169 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
171 let auth = self.mailbox_authorization(expiry);
172 let mailbox_id = auth.mailbox();
173
174 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
175 let checkpoint = self.get_mailbox_checkpoint().await?;
176 let mailbox_req = protos::mailbox_server::MailboxRequest {
177 unblinded_id: mailbox_id.serialize(),
178 authorization: Some(auth.serialize()),
179 checkpoint,
180 };
181
182 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
183 .context("error fetching mailbox")?.into_inner();
184 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
185
186 for mailbox_msg in mailbox_resp.messages {
187 self.process_mailbox_message(mailbox_msg).await;
188 }
189
190 if !mailbox_resp.have_more {
191 break;
192 }
193 }
194
195 Ok(())
196 }
197
198 async fn process_raw_vtxos(
205 &self,
206 raw_vtxos: Vec<Vec<u8>>,
207 ) -> Vec<Vtxo<Full>> {
208 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
209 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
210
211 for bytes in &raw_vtxos {
212 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
213 Ok(vtxo) => vtxo,
214 Err(e) => {
215 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
216 invalid_vtxos.push(bytes);
217 continue;
218 }
219 };
220
221 if let Err(e) = self.validate_vtxo(&vtxo).await {
222 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
223 invalid_vtxos.push(bytes);
224 continue;
225 }
226
227 valid_vtxos.push(vtxo);
228 }
229
230 if !invalid_vtxos.is_empty() {
232 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
233 }
234
235 valid_vtxos
236 }
237
238 pub(crate) async fn process_mailbox_message(
239 &self,
240 mailbox_msg: MailboxMessage,
241 ) {
242 use protos::mailbox_server::mailbox_message::Message;
243
244 let advance = match mailbox_msg.message {
250 Some(Message::Arkoor(msg)) => {
251 match self.process_received_arkoor_package(msg.vtxos).await {
252 Ok(()) => true,
253 Err(e) => {
254 error!("Error processing received arkoor package: {:#}", e);
255 false
256 }
257 }
258 }
259 Some(Message::RoundParticipationCompleted(m)) => {
260 info!("Server informed that round participation is ready, unlock_hash:{:?}",
261 UnlockHash::from_slice(&m.unlock_hash).ok(),
262 );
263 if let Err(e) = self.sync_pending_rounds().await {
264 error!("Error syncing pending rounds: {:#}", e);
265 }
266 true
267 },
268 Some(Message::IncomingLightningPayment(msg)) => {
269 if let Err(e) = self.handle_lightning_receive_notification(msg).await {
270 error!("Error handling lightning receive notification: {:#}", e);
271 }
272 true
273 },
274 Some(Message::RecoveryVtxoIds(_)) => {
275 trace!("Received recovery VTXO IDs, ignoring");
276 true
277 }
278 Some(Message::LightningSendFinished(msg)) => {
279 if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
280 error!("Error handling lightning send finished notification: {:#}", e);
281 }
282 true
283 }
284 None => {
285 warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
286 mailbox_msg.checkpoint);
287 true
288 }
289 };
290
291 if advance {
292 if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
293 error!("Error storing mailbox checkpoint: {:#}", e);
294 }
295 }
296 }
297
298 async fn process_received_arkoor_package(
299 &self,
300 raw_vtxos: Vec<Vec<u8>>,
301 ) -> anyhow::Result<()> {
302 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
303
304 let mut new_vtxos = Vec::with_capacity(vtxos.len());
305 for vtxo in &vtxos {
306 if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
308 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
309 continue;
310 }
311
312 trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
313 new_vtxos.push(vtxo);
314 }
315
316 if new_vtxos.is_empty() {
317 return Ok(());
318 }
319
320 if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
328 warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
329 }
330
331 let balance = vtxos
332 .iter()
333 .map(|vtxo| vtxo.amount()).sum::<Amount>()
334 .to_signed()?;
335 self.store_spendable_vtxos(&vtxos).await?;
336
337 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
339 for vtxo in &vtxos {
340 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
341 if let Ok(address) = self.peek_address(index).await {
342 *received_by_address.entry(address).or_default() += vtxo.amount();
343 }
344 }
345 }
346 let received_on: Vec<_> = received_by_address
347 .iter()
348 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
349 .collect();
350
351 let movement_id = self.inner.movements.new_finished_movement(
352 Subsystem::ARKOOR,
353 ArkoorMovement::Receive.to_string(),
354 MovementStatus::Successful,
355 MovementUpdate::new()
356 .produced_vtxos(&vtxos)
357 .intended_and_effective_balance(balance)
358 .received_on(received_on),
359 ).await?;
360
361 info!("Received arkoor (movement {}) for {}", movement_id, balance);
362
363 Ok(())
364 }
365
366 async fn handle_lightning_receive_notification(
371 &self,
372 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
373 ) -> anyhow::Result<()> {
374 let payment_hash = PaymentHash::try_from(notif.payment_hash)
375 .context("invalid payment hash in lightning receive notification")?;
376
377 debug!("Lightning receive notification: payment_hash={}", payment_hash);
378
379 match self.try_claim_lightning_receive(payment_hash, false, None).await {
380 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
381 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
382 }
383
384 Ok(())
385 }
386
387 async fn handle_lightning_send_finished(
392 &self,
393 notif: protos::mailbox_server::LightningSendFinishedMessage,
394 checkpoint: u64,
395 ) -> anyhow::Result<()> {
396 let payment_hash = PaymentHash::try_from(notif.payment_hash)
397 .context("invalid payment hash in lightning send finished notification")?;
398
399 let known_preimage = notif.preimage
400 .and_then(|bytes| Preimage::try_from(bytes).ok());
401
402 if known_preimage.is_some() {
403 debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
404 } else {
405 debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
406 }
407
408 match self.check_lightning_payment_with_preimage(payment_hash, known_preimage).await {
413 Ok(_) => info!("Processed lightning send finished for {}", payment_hash),
414 Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
415 }
416
417 self.store_mailbox_checkpoint(checkpoint).await?;
418 Ok(())
419 }
420
421 pub async fn post_recovery_vtxo_ids(
423 &self,
424 vtxo_ids: impl IntoIterator<Item = VtxoId>,
425 ) -> anyhow::Result<()> {
426 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
427 if vtxo_ids.is_empty() {
428 return Ok(());
429 }
430 let nb_vtxos = vtxo_ids.len();
431
432 let (mut srv, _) = self.require_server().await?;
433 let unblinded_id = self.recovery_mailbox_identifier().serialize();
434 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
435
436 srv.mailbox_client.post_recovery_vtxo_ids(req).await
437 .context("error posting recovery vtxo IDs to server")?;
438
439 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
440 Ok(())
441 }
442
443 pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
447 Ok(self.inner.db.get_mailbox_checkpoint().await?)
448 }
449
450 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
451 Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
452 }
453}