1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::{PaymentHash, Preimage};
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31use crate::utils::time::Instant;
32
33
34const MAX_MAILBOX_REQUEST_BURST: usize = 10;
44
45impl Wallet {
46 pub fn mailbox_keypair(&self) -> Keypair {
48 self.inner.seed.to_mailbox_keypair()
49 }
50
51 pub fn recovery_mailbox_keypair(&self) -> Keypair {
53 self.inner.seed.to_recovery_mailbox_keypair()
54 }
55
56 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
58 let mailbox_kp = self.mailbox_keypair();
59 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
60 }
61
62 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
64 let mailbox_kp = self.recovery_mailbox_keypair();
65 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
66 }
67
68 pub fn mailbox_authorization(
73 &self,
74 authorization_expiry: chrono::DateTime<chrono::Local>,
75 ) -> MailboxAuthorization {
76 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
77 }
78
79 pub async fn subscribe_mailbox_messages(
85 &self,
86 since_checkpoint: Option<u64>,
87 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
88 let (mut srv, _) = self.require_server().await?;
89
90 let checkpoint = if let Some(since) = since_checkpoint {
91 since
92 } else {
93 self.get_mailbox_checkpoint().await?
94 };
95
96 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
98 let auth = self.mailbox_authorization(expiry);
99 let mailbox_id = auth.mailbox();
100
101 let req = protos::mailbox_server::MailboxRequest {
102 unblinded_id: mailbox_id.serialize(),
103 authorization: Some(auth.serialize()),
104 checkpoint: checkpoint,
105 };
106
107 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
108 let m = m.context("received error on mailbox message stream")?;
109 Ok::<_, anyhow::Error>(m)
110 });
111
112 Ok(stream)
113 }
114
115 pub async fn subscribe_process_mailbox_messages(
124 &self,
125 since_checkpoint: Option<u64>,
126 shutdown: CancellationToken,
127 ) -> anyhow::Result<()> {
128 let mut reconnect_count = 0;
129 const MAX_RECONNECT_ATTEMPTS: usize = 5;
130
131 'outer: loop {
132 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
133 let connected_at = Instant::now();
134 trace!("Connected to mailbox stream with server");
135
136 loop {
137 futures::select! {
138 message = stream.next().fuse() => {
139 if let Some(message) = message {
140 reconnect_count = 0;
141 let message = message.context("error on mailbox message stream")?;
142 self.process_mailbox_message(message).await;
143 } else if connected_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
144 reconnect_count = 0;
147 info!("Mailbox stream closed after healthy session, reconnecting");
148 continue 'outer;
149 } else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
150 bail!("Mailbox stream dropped by server, giving up to retry later");
151 } else {
152 reconnect_count += 1;
153 warn!("Mailbox stream dropped by server, reconnecting");
154 continue 'outer;
155 }
156 },
157 _ = shutdown.cancelled().fuse() => {
158 info!("Shutdown signal received! Shutting mailbox messages process...");
159 return Ok(());
160 },
161 }
162 }
163 }
164 }
165
166 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
168 let (mut srv, _) = self.require_server().await?;
169
170 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
172 let auth = self.mailbox_authorization(expiry);
173 let mailbox_id = auth.mailbox();
174
175 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
176 let checkpoint = self.get_mailbox_checkpoint().await?;
177 let mailbox_req = protos::mailbox_server::MailboxRequest {
178 unblinded_id: mailbox_id.serialize(),
179 authorization: Some(auth.serialize()),
180 checkpoint,
181 };
182
183 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
184 .context("error fetching mailbox")?.into_inner();
185 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
186
187 for mailbox_msg in mailbox_resp.messages {
188 self.process_mailbox_message(mailbox_msg).await;
189 }
190
191 if !mailbox_resp.have_more {
192 break;
193 }
194 }
195
196 Ok(())
197 }
198
199 async fn process_raw_vtxos(
206 &self,
207 raw_vtxos: Vec<Vec<u8>>,
208 ) -> Vec<Vtxo<Full>> {
209 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
210 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
211
212 for bytes in &raw_vtxos {
213 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
214 Ok(vtxo) => vtxo,
215 Err(e) => {
216 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
217 invalid_vtxos.push(bytes);
218 continue;
219 }
220 };
221
222 if let Err(e) = self.validate_vtxo(&vtxo).await {
223 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
224 invalid_vtxos.push(bytes);
225 continue;
226 }
227
228 valid_vtxos.push(vtxo);
229 }
230
231 if !invalid_vtxos.is_empty() {
233 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
234 }
235
236 valid_vtxos
237 }
238
239 pub(crate) async fn process_mailbox_message(
240 &self,
241 mailbox_msg: MailboxMessage,
242 ) {
243 use protos::mailbox_server::mailbox_message::Message;
244
245 let advance = match mailbox_msg.message {
251 Some(Message::Arkoor(msg)) => {
252 match self.process_received_arkoor_package(msg.vtxos).await {
253 Ok(()) => true,
254 Err(e) => {
255 error!("Error processing received arkoor package: {:#}", e);
256 false
257 }
258 }
259 }
260 Some(Message::RoundParticipationCompleted(m)) => {
261 info!("Server informed that round participation is ready, unlock_hash:{:?}",
262 UnlockHash::from_slice(&m.unlock_hash).ok(),
263 );
264 if let Err(e) = self.sync_pending_rounds().await {
265 error!("Error syncing pending rounds: {:#}", e);
266 }
267 true
268 },
269 Some(Message::IncomingLightningPayment(msg)) => {
270 if let Err(e) = self.handle_lightning_receive_notification(msg).await {
271 error!("Error handling lightning receive notification: {:#}", e);
272 }
273 true
274 },
275 Some(Message::RecoveryVtxoIds(_)) => {
276 trace!("Received recovery VTXO IDs, ignoring");
277 true
278 }
279 Some(Message::LightningSendFinished(msg)) => {
280 if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
281 error!("Error handling lightning send finished notification: {:#}", e);
282 }
283 true
284 }
285 None => {
286 warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
287 mailbox_msg.checkpoint);
288 true
289 }
290 };
291
292 if advance {
293 if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
294 error!("Error storing mailbox checkpoint: {:#}", e);
295 }
296 }
297 }
298
299 async fn process_received_arkoor_package(
300 &self,
301 raw_vtxos: Vec<Vec<u8>>,
302 ) -> anyhow::Result<()> {
303 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
304
305 let mut new_vtxos = Vec::with_capacity(vtxos.len());
306 for vtxo in &vtxos {
307 if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
309 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
310 continue;
311 }
312
313 trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
314 new_vtxos.push(vtxo);
315 }
316
317 if new_vtxos.is_empty() {
318 return Ok(());
319 }
320
321 if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
329 warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
330 }
331
332 let balance = vtxos
333 .iter()
334 .map(|vtxo| vtxo.amount()).sum::<Amount>()
335 .to_signed()?;
336 self.store_spendable_vtxos(&vtxos).await?;
337
338 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
340 for vtxo in &vtxos {
341 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
342 if let Ok(address) = self.peek_address(index).await {
343 *received_by_address.entry(address).or_default() += vtxo.amount();
344 }
345 }
346 }
347 let received_on: Vec<_> = received_by_address
348 .iter()
349 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
350 .collect();
351
352 let movement_id = self.inner.movements.new_finished_movement(
353 Subsystem::ARKOOR,
354 ArkoorMovement::Receive.to_string(),
355 MovementStatus::Successful,
356 MovementUpdate::new()
357 .produced_vtxos(&vtxos)
358 .intended_and_effective_balance(balance)
359 .received_on(received_on),
360 ).await?;
361
362 info!("Received arkoor (movement {}) for {}", movement_id, balance);
363
364 Ok(())
365 }
366
367 async fn handle_lightning_receive_notification(
372 &self,
373 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
374 ) -> anyhow::Result<()> {
375 let payment_hash = PaymentHash::try_from(notif.payment_hash)
376 .context("invalid payment hash in lightning receive notification")?;
377
378 debug!("Lightning receive notification: payment_hash={}", payment_hash);
379
380 match self.try_claim_lightning_receive(payment_hash, false, None).await {
381 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
382 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
383 }
384
385 Ok(())
386 }
387
388 async fn handle_lightning_send_finished(
393 &self,
394 notif: protos::mailbox_server::LightningSendFinishedMessage,
395 checkpoint: u64,
396 ) -> anyhow::Result<()> {
397 let payment_hash = PaymentHash::try_from(notif.payment_hash)
398 .context("invalid payment hash in lightning send finished notification")?;
399
400 let known_preimage = notif.preimage
401 .and_then(|bytes| Preimage::try_from(bytes).ok());
402
403 if known_preimage.is_some() {
404 debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
405 } else {
406 debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
407 }
408
409 match self.check_lightning_payment_with_preimage(payment_hash, known_preimage).await {
414 Ok(_) => info!("Processed lightning send finished for {}", payment_hash),
415 Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
416 }
417
418 self.store_mailbox_checkpoint(checkpoint).await?;
419 Ok(())
420 }
421
422 pub async fn post_recovery_vtxo_ids(
424 &self,
425 vtxo_ids: impl IntoIterator<Item = VtxoId>,
426 ) -> anyhow::Result<()> {
427 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
428 if vtxo_ids.is_empty() {
429 return Ok(());
430 }
431 let nb_vtxos = vtxo_ids.len();
432
433 let (mut srv, _) = self.require_server().await?;
434 let unblinded_id = self.recovery_mailbox_identifier().serialize();
435 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
436
437 srv.mailbox_client.post_recovery_vtxo_ids(req).await
438 .context("error posting recovery vtxo IDs to server")?;
439
440 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
441 Ok(())
442 }
443
444 pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
448 Ok(self.inner.db.get_mailbox_checkpoint().await?)
449 }
450
451 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
452 Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
453 }
454}