1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::actions::DriveMode;
29use crate::actions::lightning::pay::Progress;
30use crate::movement::{MovementDestination, MovementStatus};
31use crate::movement::update::MovementUpdate;
32use crate::subsystem::{ArkoorMovement, Subsystem};
33
34
35const MAX_MAILBOX_REQUEST_BURST: usize = 10;
45
46impl Wallet {
47 pub fn mailbox_keypair(&self) -> Keypair {
49 self.inner.seed.to_mailbox_keypair()
50 }
51
52 pub fn recovery_mailbox_keypair(&self) -> Keypair {
54 self.inner.seed.to_recovery_mailbox_keypair()
55 }
56
57 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
59 let mailbox_kp = self.mailbox_keypair();
60 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
61 }
62
63 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
65 let mailbox_kp = self.recovery_mailbox_keypair();
66 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
67 }
68
69 pub fn mailbox_authorization(
74 &self,
75 authorization_expiry: chrono::DateTime<chrono::Local>,
76 ) -> MailboxAuthorization {
77 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
78 }
79
80 pub async fn subscribe_mailbox_messages(
86 &self,
87 since_checkpoint: Option<u64>,
88 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
89 let (mut srv, _) = self.require_server().await?;
90
91 let checkpoint = if let Some(since) = since_checkpoint {
92 since
93 } else {
94 self.get_mailbox_checkpoint().await?
95 };
96
97 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
99 let auth = self.mailbox_authorization(expiry);
100 let mailbox_id = auth.mailbox();
101
102 let req = protos::mailbox_server::MailboxRequest {
103 unblinded_id: mailbox_id.serialize(),
104 authorization: Some(auth.serialize()),
105 checkpoint: checkpoint,
106 };
107
108 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
109 let m = m.context("received error on mailbox message stream")?;
110 Ok::<_, anyhow::Error>(m)
111 });
112
113 Ok(stream)
114 }
115
116 pub async fn subscribe_process_mailbox_messages(
125 &self,
126 since_checkpoint: Option<u64>,
127 shutdown: CancellationToken,
128 ) -> anyhow::Result<()> {
129 let mut reconnect_count = 0;
130 const MAX_RECONNECT_ATTEMPTS: usize = 5;
131
132 'outer: loop {
133 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
134 trace!("Connected to mailbox stream with server");
135
136 loop {
137 futures::select! {
138 message = stream.next().fuse() => {
139 match message {
140 Some(Ok(message)) => {
141 reconnect_count = 0;
142 self.process_mailbox_message(message).await;
143 },
144 Some(Err(e)) if crate::utils::is_h2_stream_error(&e) => {
148 reconnect_count = 0;
149 trace!("Mailbox stream reset by server, reconnecting: {e:#}");
150 continue 'outer;
151 },
152 Some(Err(e)) => {
153 return Err(e).context("error on mailbox message stream");
154 },
155 None if reconnect_count >= MAX_RECONNECT_ATTEMPTS => {
156 bail!("Mailbox stream dropped by server, giving up to retry later");
157 },
158 None => {
159 reconnect_count += 1;
160 warn!("Mailbox stream dropped by server, reconnecting");
161 continue 'outer;
162 },
163 }
164 },
165 _ = shutdown.cancelled().fuse() => {
166 info!("Shutdown signal received! Shutting mailbox messages process...");
167 return Ok(());
168 },
169 }
170 }
171 }
172 }
173
174 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
176 let (mut srv, _) = self.require_server().await?;
177
178 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
180 let auth = self.mailbox_authorization(expiry);
181 let mailbox_id = auth.mailbox();
182
183 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
184 let checkpoint = self.get_mailbox_checkpoint().await?;
185 let mailbox_req = protos::mailbox_server::MailboxRequest {
186 unblinded_id: mailbox_id.serialize(),
187 authorization: Some(auth.serialize()),
188 checkpoint,
189 };
190
191 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
192 .context("error fetching mailbox")?.into_inner();
193 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
194
195 for mailbox_msg in mailbox_resp.messages {
196 self.process_mailbox_message(mailbox_msg).await;
197 }
198
199 if !mailbox_resp.have_more {
200 break;
201 }
202 }
203
204 Ok(())
205 }
206
207 async fn process_raw_vtxos(
214 &self,
215 raw_vtxos: Vec<Vec<u8>>,
216 ) -> Vec<Vtxo<Full>> {
217 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
218 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
219
220 for bytes in &raw_vtxos {
221 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
222 Ok(vtxo) => vtxo,
223 Err(e) => {
224 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
225 invalid_vtxos.push(bytes);
226 continue;
227 }
228 };
229
230 if let Err(e) = self.validate_vtxo(&vtxo).await {
231 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
232 invalid_vtxos.push(bytes);
233 continue;
234 }
235
236 valid_vtxos.push(vtxo);
237 }
238
239 if !invalid_vtxos.is_empty() {
241 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
242 }
243
244 valid_vtxos
245 }
246
247 pub(crate) async fn process_mailbox_message(
248 &self,
249 mailbox_msg: MailboxMessage,
250 ) {
251 use protos::mailbox_server::mailbox_message::Message;
252
253 let advance = match mailbox_msg.message {
259 Some(Message::Arkoor(msg)) => {
260 match self.process_received_arkoor_package(msg.vtxos).await {
261 Ok(()) => true,
262 Err(e) => {
263 error!("Error processing received arkoor package: {:#}", e);
264 false
265 }
266 }
267 }
268 Some(Message::RoundParticipationCompleted(m)) => {
269 info!("Server informed that round participation is ready, unlock_hash:{:?}",
270 UnlockHash::from_slice(&m.unlock_hash).ok(),
271 );
272 if let Err(e) = self.sync_pending_rounds().await {
273 error!("Error syncing pending rounds: {:#}", e);
274 }
275 true
276 },
277 Some(Message::IncomingLightningPayment(msg)) => {
278 if let Err(e) = self.handle_lightning_receive_notification(msg).await {
279 error!("Error handling lightning receive notification: {:#}", e);
280 }
281 true
282 },
283 Some(Message::RecoveryVtxoIds(_)) => {
284 trace!("Received recovery VTXO IDs, ignoring");
285 true
286 }
287 Some(Message::LightningSendFinished(msg)) => {
288 if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
289 error!("Error handling lightning send finished notification: {:#}", e);
290 }
291 true
292 }
293 None => {
294 warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
295 mailbox_msg.checkpoint);
296 true
297 }
298 };
299
300 if advance {
301 if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
302 error!("Error storing mailbox checkpoint: {:#}", e);
303 }
304 }
305 }
306
307 async fn process_received_arkoor_package(
308 &self,
309 raw_vtxos: Vec<Vec<u8>>,
310 ) -> anyhow::Result<()> {
311 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
312
313 let mut new_vtxos = Vec::with_capacity(vtxos.len());
314 for vtxo in &vtxos {
315 if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
317 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
318 continue;
319 }
320
321 trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
322 new_vtxos.push(vtxo);
323 }
324
325 if new_vtxos.is_empty() {
326 return Ok(());
327 }
328
329 if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
337 warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
338 }
339
340 let balance = vtxos
341 .iter()
342 .map(|vtxo| vtxo.amount()).sum::<Amount>()
343 .to_signed()?;
344 self.store_spendable_vtxos(&vtxos).await?;
345
346 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
348 for vtxo in &vtxos {
349 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
350 if let Ok(address) = self.peek_address(index).await {
351 *received_by_address.entry(address).or_default() += vtxo.amount();
352 }
353 }
354 }
355 let received_on: Vec<_> = received_by_address
356 .iter()
357 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
358 .collect();
359
360 let movement_id = self.inner.movements.new_finished_movement(
361 Subsystem::ARKOOR,
362 ArkoorMovement::Receive.to_string(),
363 MovementStatus::Successful,
364 MovementUpdate::new()
365 .produced_vtxos(&vtxos)
366 .intended_and_effective_balance(balance)
367 .received_on(received_on),
368 ).await?;
369
370 info!("Received arkoor (movement {}) for {}", movement_id, balance);
371
372 Ok(())
373 }
374
375 async fn handle_lightning_receive_notification(
380 &self,
381 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
382 ) -> anyhow::Result<()> {
383 let payment_hash = PaymentHash::try_from(notif.payment_hash)
384 .context("invalid payment hash in lightning receive notification")?;
385
386 debug!("Lightning receive notification: payment_hash={}", payment_hash);
387
388 match self.try_claim_lightning_receive(payment_hash, false, None).await {
389 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
390 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
391 }
392
393 Ok(())
394 }
395
396 async fn handle_lightning_send_finished(
401 &self,
402 notif: protos::mailbox_server::LightningSendFinishedMessage,
403 checkpoint: u64,
404 ) -> anyhow::Result<()> {
405 let payment_hash = PaymentHash::try_from(notif.payment_hash)
406 .context("invalid payment hash in lightning send finished notification")?;
407
408 let known_preimage = notif.preimage
409 .and_then(|bytes| Preimage::try_from(bytes).ok());
410
411 if known_preimage.is_some() {
412 debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
413 } else {
414 debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
415 }
416
417 match self.is_invoice_paid(payment_hash).await {
421 Ok(true) => {
422 debug!("Lightning send {} already settled; ignoring notification", payment_hash);
423 },
424 Ok(false) => {
425 let lookup = self.lightning_send_checkpoint(payment_hash).await;
426 match lookup {
427 Ok(Some(send)) => {
428 let result = match (&send.progress, known_preimage) {
429 (Progress::PaymentInitiated(htlcs), Some(preimage)) => {
430 let htlcs = htlcs.clone();
431 self.settle_lightning_send_with_preimage(send, htlcs, preimage).await
432 },
433 (Progress::PaymentInitiated(_), None) => {
434 self.drive_action(send, DriveMode::UntilParkOrDone).await
435 },
436 _ => {
437 debug!("Lightning send finished notification for {} but checkpoint is not PaymentInitiated; ignoring", payment_hash);
438 Ok(())
439 },
440 };
441 match result {
442 Ok(()) => info!("Processed lightning send finished for {}", payment_hash),
443 Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
444 }
445 },
446 Ok(None) => {
447 warn!("Lightning send finished notification for unknown payment hash {}", payment_hash);
448 },
449 Err(e) => {
450 error!("Failed to look up lightning send checkpoint for {}: {:#}", payment_hash, e);
451 },
452 }
453 },
454 Err(e) => {
455 error!("Failed to look up paid_invoice for {}: {:#}", payment_hash, e);
456 },
457 }
458
459 self.store_mailbox_checkpoint(checkpoint).await?;
460 Ok(())
461 }
462
463 pub async fn post_recovery_vtxo_ids(
465 &self,
466 vtxo_ids: impl IntoIterator<Item = VtxoId>,
467 ) -> anyhow::Result<()> {
468 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
469 if vtxo_ids.is_empty() {
470 return Ok(());
471 }
472 let nb_vtxos = vtxo_ids.len();
473
474 let (mut srv, _) = self.require_server().await?;
475 let unblinded_id = self.recovery_mailbox_identifier().serialize();
476 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
477
478 srv.mailbox_client.post_recovery_vtxo_ids(req).await
479 .context("error posting recovery vtxo IDs to server")?;
480
481 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
482 Ok(())
483 }
484
485 pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
489 Ok(self.inner.db.get_mailbox_checkpoint().await?)
490 }
491
492 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
493 Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
494 }
495}