1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9use std::ops::ControlFlow;
10
11use anyhow::Context;
12use ark::tree::signed::UnlockHash;
13use bitcoin::hashes::Hash;
14use bitcoin::Amount;
15use bitcoin::hex::DisplayHex;
16use bitcoin::secp256k1::Keypair;
17use futures::{FutureExt, Stream, StreamExt};
18use log::{debug, error, info, trace, warn};
19use tokio_util::sync::CancellationToken;
20
21use ark::{ProtocolEncoding, Vtxo, VtxoId};
22use ark::lightning::{PaymentHash, Preimage};
23use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
24use ark::vtxo::Full;
25use server_rpc::protos;
26use server_rpc::protos::mailbox_server::MailboxMessage;
27
28use crate::{Wallet, SUBSCRIBE_REQUEST_TIMEOUT};
29use crate::actions::DriveMode;
30use crate::actions::lightning::pay::Progress;
31use crate::movement::{MovementDestination, MovementStatus};
32use crate::movement::update::MovementUpdate;
33use crate::subsystem::{ArkoorMovement, Subsystem};
34
35
36const MAX_MAILBOX_REQUEST_BURST: usize = 10;
46
47const MAILBOX_PROCESSING_LOCK_KEY: &str = "mailbox.processing";
65
66const MAILBOX_PROCESSING_LOCK_TIMEOUT: std::time::Duration =
76 std::time::Duration::from_secs(30);
77
78impl Wallet {
79 pub fn mailbox_keypair(&self) -> Keypair {
81 self.inner.seed.to_mailbox_keypair()
82 }
83
84 pub fn recovery_mailbox_keypair(&self) -> Keypair {
86 self.inner.seed.to_recovery_mailbox_keypair()
87 }
88
89 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
91 let mailbox_kp = self.mailbox_keypair();
92 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
93 }
94
95 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
97 let mailbox_kp = self.recovery_mailbox_keypair();
98 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
99 }
100
101 pub fn mailbox_authorization(
106 &self,
107 authorization_expiry: chrono::DateTime<chrono::Local>,
108 ) -> MailboxAuthorization {
109 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
110 }
111
112 pub async fn subscribe_mailbox_messages(
118 &self,
119 since_checkpoint: Option<u64>,
120 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
121 let (mut srv, _) = self.require_server().await?;
122
123 let checkpoint = if let Some(since) = since_checkpoint {
124 since
125 } else {
126 self.get_mailbox_checkpoint().await?
127 };
128
129 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
131 let auth = self.mailbox_authorization(expiry);
132 let mailbox_id = auth.mailbox();
133
134 let mut req = tonic::IntoRequest::into_request(protos::mailbox_server::MailboxRequest {
135 mailbox_id: mailbox_id.serialize(),
136 authorization: Some(auth.serialize()),
137 checkpoint: checkpoint,
138 });
139 req.set_timeout(SUBSCRIBE_REQUEST_TIMEOUT);
140
141 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
142 let m = m.context("received error on mailbox message stream")?;
143 Ok::<_, anyhow::Error>(m)
144 });
145
146 Ok(stream)
147 }
148
149 pub async fn subscribe_process_mailbox_messages(
158 &self,
159 since_checkpoint: Option<u64>,
160 shutdown: CancellationToken,
161 ) -> anyhow::Result<()> {
162 let mut reconnect_count = 0;
163 const MAX_RECONNECT_ATTEMPTS: usize = 5;
164
165 'outer: loop {
166 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
167 trace!("Connected to mailbox stream with server");
168
169 loop {
170 futures::select! {
171 message = stream.next().fuse() => {
172 match message {
173 Some(Ok(message)) => {
174 reconnect_count = 0;
175 if self.process_mailbox_message(message).await.is_break() {
176 trace!("Halting mailbox stream after unadvanced message; resubscribing");
182 continue 'outer;
183 }
184 },
185 Some(Err(e)) if crate::utils::is_h2_stream_error(&e) => {
189 reconnect_count = 0;
190 trace!("Mailbox stream reset by server, reconnecting: {e:#}");
191 continue 'outer;
192 },
193 Some(Err(e)) => {
194 return Err(e).context("error on mailbox message stream");
195 },
196 None if reconnect_count >= MAX_RECONNECT_ATTEMPTS => {
197 bail!("Mailbox stream dropped by server, giving up to retry later");
198 },
199 None => {
200 reconnect_count += 1;
201 warn!("Mailbox stream dropped by server, reconnecting");
202 continue 'outer;
203 },
204 }
205 },
206 _ = shutdown.cancelled().fuse() => {
207 info!("Shutdown signal received! Shutting mailbox messages process...");
208 return Ok(());
209 },
210 }
211 }
212 }
213 }
214
215 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
217 let (mut srv, _) = self.require_server().await?;
218
219 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
221 let auth = self.mailbox_authorization(expiry);
222 let mailbox_id = auth.mailbox();
223
224 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
225 let checkpoint = self.get_mailbox_checkpoint().await?;
226 let mailbox_req = protos::mailbox_server::MailboxRequest {
227 mailbox_id: mailbox_id.serialize(),
228 authorization: Some(auth.serialize()),
229 checkpoint,
230 };
231
232 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
233 .context("error fetching mailbox")?.into_inner();
234 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
235
236 for mailbox_msg in mailbox_resp.messages {
237 if self.process_mailbox_message(mailbox_msg).await.is_break() {
238 return Ok(());
242 }
243 }
244
245 if !mailbox_resp.have_more {
246 break;
247 }
248 }
249
250 Ok(())
251 }
252
253 async fn process_raw_vtxos(
260 &self,
261 raw_vtxos: Vec<Vec<u8>>,
262 ) -> Vec<Vtxo<Full>> {
263 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
264 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
265
266 for bytes in &raw_vtxos {
267 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
268 Ok(vtxo) => vtxo,
269 Err(e) => {
270 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
271 invalid_vtxos.push(bytes);
272 continue;
273 }
274 };
275
276 if let Err(e) = self.validate_vtxo(&vtxo).await {
277 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
278 invalid_vtxos.push(bytes);
279 continue;
280 }
281
282 valid_vtxos.push(vtxo);
283 }
284
285 if !invalid_vtxos.is_empty() {
287 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
288 }
289
290 valid_vtxos
291 }
292
293 pub(crate) async fn process_mailbox_message(
302 &self,
303 mailbox_msg: MailboxMessage,
304 ) -> ControlFlow<()> {
305 use protos::mailbox_server::mailbox_message::Message;
306
307 let advance = match mailbox_msg.message {
313 Some(Message::Arkoor(msg)) => {
314 match self.process_received_arkoor_package(msg.vtxos).await {
315 Ok(()) => true,
316 Err(e) => {
317 error!("Error processing received arkoor package: {:#}", e);
318 false
319 }
320 }
321 }
322 Some(Message::RoundParticipationCompleted(m)) => {
323 info!("Server informed that round participation is ready, unlock_hash:{:?}",
324 UnlockHash::from_slice(&m.unlock_hash).ok(),
325 );
326 if let Err(e) = self.sync_pending_rounds().await {
327 error!("Error syncing pending rounds: {:#}", e);
328 }
329 true
330 },
331 Some(Message::IncomingLightningPayment(msg)) => {
332 if let Err(e) = self.handle_lightning_receive_notification(msg).await {
333 error!("Error handling lightning receive notification: {:#}", e);
334 }
335 true
336 },
337 Some(Message::RecoveryVtxoIds(_)) => {
338 trace!("Received recovery VTXO IDs, ignoring");
339 true
340 }
341 Some(Message::LightningSendFinished(msg)) => {
342 if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
343 error!("Error handling lightning send finished notification: {:#}", e);
344 }
345 true
346 }
347 None => {
348 warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
349 mailbox_msg.checkpoint);
350 true
351 }
352 };
353
354 if advance {
355 if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
356 error!("Error storing mailbox checkpoint: {:#}", e);
357 }
358 ControlFlow::Continue(())
359 } else {
360 ControlFlow::Break(())
364 }
365 }
366
367 async fn process_received_arkoor_package(
368 &self,
369 raw_vtxos: Vec<Vec<u8>>,
370 ) -> anyhow::Result<()> {
371 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
372
373 let _guard = self.inner.lock_manager.lock(
379 MAILBOX_PROCESSING_LOCK_KEY, MAILBOX_PROCESSING_LOCK_TIMEOUT,
380 ).await.context("failed to acquire mailbox processing lock")?;
381
382 let mut new_vtxos = Vec::with_capacity(vtxos.len());
383 for vtxo in &vtxos {
384 if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
386 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
387 continue;
388 }
389
390 trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
391 new_vtxos.push(vtxo);
392 }
393
394 if new_vtxos.is_empty() {
395 return Ok(());
396 }
397
398 if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
406 warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
407 }
408
409 let balance = vtxos
410 .iter()
411 .map(|vtxo| vtxo.amount()).sum::<Amount>()
412 .to_signed()?;
413 self.store_spendable_vtxos(&vtxos).await?;
414
415 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
417 for vtxo in &vtxos {
418 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
419 if let Ok(address) = self.peek_address(index).await {
420 *received_by_address.entry(address).or_default() += vtxo.amount();
421 }
422 }
423 }
424 let received_on: Vec<_> = received_by_address
425 .iter()
426 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
427 .collect();
428
429 let movement_id = self.inner.movements.new_finished_movement(
430 Subsystem::ARKOOR,
431 ArkoorMovement::Receive.to_string(),
432 MovementStatus::Successful,
433 MovementUpdate::new()
434 .produced_vtxos(&vtxos)
435 .intended_and_effective_balance(balance)
436 .received_on(received_on),
437 ).await?;
438
439 info!("Received arkoor (movement {}) for {}", movement_id, balance);
440
441 Ok(())
442 }
443
444 async fn handle_lightning_receive_notification(
449 &self,
450 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
451 ) -> anyhow::Result<()> {
452 let payment_hash = PaymentHash::try_from(notif.payment_hash)
453 .context("invalid payment hash in lightning receive notification")?;
454
455 debug!("Lightning receive notification: payment_hash={}", payment_hash);
456
457 match self.try_claim_lightning_receive(payment_hash, false, None).await {
458 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
459 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
460 }
461
462 Ok(())
463 }
464
465 async fn handle_lightning_send_finished(
470 &self,
471 notif: protos::mailbox_server::LightningSendFinishedMessage,
472 checkpoint: u64,
473 ) -> anyhow::Result<()> {
474 let payment_hash = PaymentHash::try_from(notif.payment_hash)
475 .context("invalid payment hash in lightning send finished notification")?;
476
477 let known_preimage = notif.preimage
478 .and_then(|bytes| Preimage::try_from(bytes).ok());
479
480 if known_preimage.is_some() {
481 debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
482 } else {
483 debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
484 }
485
486 match self.is_invoice_paid(payment_hash).await {
490 Ok(true) => {
491 debug!("Lightning send {} already settled; ignoring notification", payment_hash);
492 },
493 Ok(false) => {
494 let lookup = self.lightning_send_checkpoint(payment_hash).await;
495 match lookup {
496 Ok(Some(send)) => {
497 let result = match (&send.progress, known_preimage) {
498 (Progress::PaymentInitiated(htlcs), Some(preimage)) => {
499 let htlcs = htlcs.clone();
500 self.settle_lightning_send_with_preimage(send, htlcs, preimage).await
501 },
502 (Progress::PaymentInitiated(_), None) => {
503 self.drive_action(send, DriveMode::UntilParkOrDone).await
504 },
505 _ => {
506 debug!("Lightning send finished notification for {} but checkpoint is not PaymentInitiated; ignoring", payment_hash);
507 Ok(())
508 },
509 };
510 match result {
511 Ok(()) => info!("Processed lightning send finished for {}", payment_hash),
512 Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
513 }
514 },
515 Ok(None) => {
516 warn!("Lightning send finished notification for unknown payment hash {}", payment_hash);
517 },
518 Err(e) => {
519 error!("Failed to look up lightning send checkpoint for {}: {:#}", payment_hash, e);
520 },
521 }
522 },
523 Err(e) => {
524 error!("Failed to look up paid_invoice for {}: {:#}", payment_hash, e);
525 },
526 }
527
528 self.store_mailbox_checkpoint(checkpoint).await?;
529 Ok(())
530 }
531
532 pub async fn post_recovery_vtxo_ids(
534 &self,
535 vtxo_ids: impl IntoIterator<Item = VtxoId>,
536 ) -> anyhow::Result<()> {
537 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
538 if vtxo_ids.is_empty() {
539 return Ok(());
540 }
541 let nb_vtxos = vtxo_ids.len();
542
543 let (mut srv, _) = self.require_server().await?;
544 let mailbox_id = self.recovery_mailbox_identifier().serialize();
545 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { mailbox_id, vtxo_ids };
546
547 srv.mailbox_client.post_recovery_vtxo_ids(req).await
548 .context("error posting recovery vtxo IDs to server")?;
549
550 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
551 Ok(())
552 }
553
554 pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
558 Ok(self.inner.db.get_mailbox_checkpoint().await?)
559 }
560
561 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
562 Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
563 }
564}