1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::PaymentHash;
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31
32
33const MAX_MAILBOX_REQUEST_BURST: usize = 10;
43
44impl Wallet {
45 pub fn mailbox_keypair(&self) -> Keypair {
47 self.seed.to_mailbox_keypair()
48 }
49
50 pub fn recovery_mailbox_keypair(&self) -> Keypair {
52 self.seed.to_recovery_mailbox_keypair()
53 }
54
55 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
57 let mailbox_kp = self.mailbox_keypair();
58 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
59 }
60
61 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
63 let mailbox_kp = self.recovery_mailbox_keypair();
64 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
65 }
66
67 pub fn mailbox_authorization(
72 &self,
73 authorization_expiry: chrono::DateTime<chrono::Local>,
74 ) -> MailboxAuthorization {
75 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
76 }
77
78 pub async fn subscribe_mailbox_messages(
84 &self,
85 since_checkpoint: Option<u64>,
86 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
87 let (mut srv, _) = self.require_server().await?;
88
89 let checkpoint = if let Some(since) = since_checkpoint {
90 since
91 } else {
92 self.get_mailbox_checkpoint().await?
93 };
94
95 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
97 let auth = self.mailbox_authorization(expiry);
98 let mailbox_id = auth.mailbox();
99
100 let req = protos::mailbox_server::MailboxRequest {
101 unblinded_id: mailbox_id.to_vec(),
102 authorization: Some(auth.serialize()),
103 checkpoint: checkpoint,
104 };
105
106 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
107 let m = m.context("received error on mailbox message stream")?;
108 Ok::<_, anyhow::Error>(m)
109 });
110
111 Ok(stream)
112 }
113
114 pub async fn subscribe_process_mailbox_messages(
123 &self,
124 since_checkpoint: Option<u64>,
125 shutdown: CancellationToken,
126 ) -> anyhow::Result<()> {
127 let mut reconnect_count = 0;
128 const MAX_RECONNECT_ATTEMPTS: usize = 5;
129
130 'outer: loop {
131 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
132 trace!("Connected to mailbox stream with server");
133
134 loop {
135 futures::select! {
136 message = stream.next().fuse() => {
137 if let Some(message) = message {
138 reconnect_count = 0;
139 let message = message.context("error on mailbox message stream")?;
140 self.process_mailbox_message(message).await;
141 } else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
142 bail!("Mailbox stream dropped by server, giving up to retry later");
143 } else {
144 reconnect_count += 1;
145 warn!("Mailbox stream dropped by server, reconnecting");
146 continue 'outer;
147 }
148 },
149 _ = shutdown.cancelled().fuse() => {
150 info!("Shutdown signal received! Shutting mailbox messages process...");
151 return Ok(());
152 },
153 }
154 }
155 }
156 }
157
158 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
160 let (mut srv, _) = self.require_server().await?;
161
162 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
164 let auth = self.mailbox_authorization(expiry);
165 let mailbox_id = auth.mailbox();
166
167 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
168 let checkpoint = self.get_mailbox_checkpoint().await?;
169 let mailbox_req = protos::mailbox_server::MailboxRequest {
170 unblinded_id: mailbox_id.to_vec(),
171 authorization: Some(auth.serialize()),
172 checkpoint,
173 };
174
175 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
176 .context("error fetching mailbox")?.into_inner();
177 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
178
179 for mailbox_msg in mailbox_resp.messages {
180 self.process_mailbox_message(mailbox_msg).await;
181 }
182
183 if !mailbox_resp.have_more {
184 break;
185 }
186 }
187
188 Ok(())
189 }
190
191 async fn process_raw_vtxos(
198 &self,
199 raw_vtxos: Vec<Vec<u8>>,
200 ) -> Vec<Vtxo<Full>> {
201 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
202 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
203
204 for bytes in &raw_vtxos {
205 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
206 Ok(vtxo) => vtxo,
207 Err(e) => {
208 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
209 invalid_vtxos.push(bytes);
210 continue;
211 }
212 };
213
214 if let Err(e) = self.validate_vtxo(&vtxo).await {
215 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
216 invalid_vtxos.push(bytes);
217 continue;
218 }
219
220 valid_vtxos.push(vtxo);
221 }
222
223 if !invalid_vtxos.is_empty() {
225 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
226 }
227
228 valid_vtxos
229 }
230
231 pub(crate) async fn process_mailbox_message(
232 &self,
233 mailbox_msg: MailboxMessage,
234 ) {
235 use protos::mailbox_server::mailbox_message::Message;
236
237 match mailbox_msg.message {
238 Some(Message::Arkoor(msg)) => {
239 let result = self
240 .process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
241 if let Err(e) = result {
242 error!("Error processing received arkoor package: {:#}", e);
243 }
244 }
245 Some(Message::RoundParticipationCompleted(m)) => {
246 info!("Server informed that round participation is ready, unlock_hash:{:?}",
249 UnlockHash::from_slice(&m.unlock_hash).ok(),
250 );
251 if let Err(e) = self.sync_pending_rounds().await {
252 error!("Error syncing pending rounds: {:#}", e);
253 }
254 },
255 Some(Message::IncomingLightningPayment(msg)) => {
256 if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
257 error!("Error handling lightning receive notification: {:#}", e);
258 }
259 },
260 Some(Message::RecoveryVtxoIds(_)) => {
261 trace!("Received recovery VTXO IDs, ignoring");
262 }
263 None => {
264 warn!("Received unknown mailbox message, ignoring");
265 }
266 }
267 }
268
269 async fn process_received_arkoor_package(
270 &self,
271 raw_vtxos: Vec<Vec<u8>>,
272 checkpoint: Option<u64>,
273 ) -> anyhow::Result<()> {
274 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
275
276 let mut new_vtxos = Vec::with_capacity(vtxos.len());
277 for vtxo in &vtxos {
278 if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
280 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
281 continue;
282 }
283
284 trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
285 new_vtxos.push(vtxo);
286 }
287
288 if new_vtxos.is_empty() {
289 return Ok(());
290 }
291
292 if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
300 warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
301 }
302
303 let balance = vtxos
304 .iter()
305 .map(|vtxo| vtxo.amount()).sum::<Amount>()
306 .to_signed()?;
307 self.store_spendable_vtxos(&vtxos).await?;
308
309 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
311 for vtxo in &vtxos {
312 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
313 if let Ok(address) = self.peek_address(index).await {
314 *received_by_address.entry(address).or_default() += vtxo.amount();
315 }
316 }
317 }
318 let received_on: Vec<_> = received_by_address
319 .iter()
320 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
321 .collect();
322
323 let movement_id = self.movements.new_finished_movement(
324 Subsystem::ARKOOR,
325 ArkoorMovement::Receive.to_string(),
326 MovementStatus::Successful,
327 MovementUpdate::new()
328 .produced_vtxos(&vtxos)
329 .intended_and_effective_balance(balance)
330 .received_on(received_on),
331 ).await?;
332
333 info!("Received arkoor (movement {}) for {}", movement_id, balance);
334
335 if let Some(checkpoint) = checkpoint {
336 self.store_mailbox_checkpoint(checkpoint).await?;
337 }
338
339 Ok(())
340 }
341
342 async fn handle_lightning_receive_notification(
347 &self,
348 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
349 checkpoint: u64,
350 ) -> anyhow::Result<()> {
351 let payment_hash = PaymentHash::try_from(notif.payment_hash)
352 .context("invalid payment hash in lightning receive notification")?;
353
354 debug!("Lightning receive notification: payment_hash={}", payment_hash);
355
356 match self.try_claim_lightning_receive(payment_hash, false, None).await {
357 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
358 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
359 }
360
361 self.store_mailbox_checkpoint(checkpoint).await?;
362 Ok(())
363 }
364
365 pub async fn post_recovery_vtxo_ids(
367 &self,
368 vtxo_ids: impl IntoIterator<Item = VtxoId>,
369 ) -> anyhow::Result<()> {
370 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
371 if vtxo_ids.is_empty() {
372 return Ok(());
373 }
374 let nb_vtxos = vtxo_ids.len();
375
376 let (mut srv, _) = self.require_server().await?;
377 let unblinded_id = self.recovery_mailbox_identifier().to_vec();
378 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
379
380 srv.mailbox_client.post_recovery_vtxo_ids(req).await
381 .context("error posting recovery vtxo IDs to server")?;
382
383 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
384 Ok(())
385 }
386
387 pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
391 Ok(self.db.get_mailbox_checkpoint().await?)
392 }
393
394 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
395 Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
396 }
397}