1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use ark::tree::signed::UnlockHash;
12use bitcoin::hashes::Hash;
13use bitcoin::Amount;
14use bitcoin::hex::DisplayHex;
15use bitcoin::secp256k1::Keypair;
16use futures::{FutureExt, Stream, StreamExt};
17use log::{debug, error, info, trace, warn};
18use tokio_util::sync::CancellationToken;
19
20use ark::{ProtocolEncoding, Vtxo, VtxoId};
21use ark::lightning::PaymentHash;
22use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
23use ark::vtxo::Full;
24use server_rpc::protos;
25use server_rpc::protos::mailbox_server::MailboxMessage;
26
27use crate::Wallet;
28use crate::movement::{MovementDestination, MovementStatus};
29use crate::movement::update::MovementUpdate;
30use crate::subsystem::{ArkoorMovement, Subsystem};
31
32
33const MAX_MAILBOX_REQUEST_BURST: usize = 10;
43
44impl Wallet {
45 pub fn mailbox_keypair(&self) -> Keypair {
47 self.seed.to_mailbox_keypair()
48 }
49
50 pub fn recovery_mailbox_keypair(&self) -> Keypair {
52 self.seed.to_recovery_mailbox_keypair()
53 }
54
55 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
57 let mailbox_kp = self.mailbox_keypair();
58 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
59 }
60
61 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
63 let mailbox_kp = self.recovery_mailbox_keypair();
64 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
65 }
66
67 pub fn mailbox_authorization(
72 &self,
73 authorization_expiry: chrono::DateTime<chrono::Local>,
74 ) -> MailboxAuthorization {
75 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
76 }
77
78 pub async fn subscribe_mailbox_messages(
84 &self,
85 since_checkpoint: Option<u64>,
86 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
87 let (mut srv, _) = self.require_server().await?;
88
89 let checkpoint = if let Some(since) = since_checkpoint {
90 since
91 } else {
92 self.get_mailbox_checkpoint().await?
93 };
94
95 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
97 let auth = self.mailbox_authorization(expiry);
98 let mailbox_id = auth.mailbox();
99
100 let req = protos::mailbox_server::MailboxRequest {
101 unblinded_id: mailbox_id.to_vec(),
102 authorization: Some(auth.serialize()),
103 checkpoint: checkpoint,
104 };
105
106 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
107 let m = m.context("received error on mailbox message stream")?;
108 Ok::<_, anyhow::Error>(m)
109 });
110
111 Ok(stream)
112 }
113
114 pub async fn subscribe_process_mailbox_messages(
123 &self,
124 since_checkpoint: Option<u64>,
125 shutdown: CancellationToken,
126 ) -> anyhow::Result<()> {
127 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
128
129 loop {
130 futures::select! {
131 message = stream.next().fuse() => {
132 if let Some(message) = message {
133 let message = message.context("error on mailbox message stream")?;
134 self.process_mailbox_message(message).await;
135 }
136 },
137 _ = shutdown.cancelled().fuse() => {
138 info!("Shutdown signal received! Shutting mailbox messages process...");
139 return Ok(());
140 },
141 }
142 }
143 }
144
145 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
147 let (mut srv, _) = self.require_server().await?;
148
149 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
151 let auth = self.mailbox_authorization(expiry);
152 let mailbox_id = auth.mailbox();
153
154 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
155 let checkpoint = self.get_mailbox_checkpoint().await?;
156 let mailbox_req = protos::mailbox_server::MailboxRequest {
157 unblinded_id: mailbox_id.to_vec(),
158 authorization: Some(auth.serialize()),
159 checkpoint,
160 };
161
162 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
163 .context("error fetching mailbox")?.into_inner();
164 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
165
166 for mailbox_msg in mailbox_resp.messages {
167 self.process_mailbox_message(mailbox_msg).await;
168 }
169
170 if !mailbox_resp.have_more {
171 break;
172 }
173 }
174
175 Ok(())
176 }
177
178 async fn process_raw_vtxos(
185 &self,
186 raw_vtxos: Vec<Vec<u8>>,
187 ) -> Vec<Vtxo<Full>> {
188 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
189 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
190
191 for bytes in &raw_vtxos {
192 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
193 Ok(vtxo) => vtxo,
194 Err(e) => {
195 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
196 invalid_vtxos.push(bytes);
197 continue;
198 }
199 };
200
201 if let Err(e) = self.validate_vtxo(&vtxo).await {
202 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
203 invalid_vtxos.push(bytes);
204 continue;
205 }
206
207 valid_vtxos.push(vtxo);
208 }
209
210 if !invalid_vtxos.is_empty() {
212 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
213 }
214
215 valid_vtxos
216 }
217
218 pub(crate) async fn process_mailbox_message(
219 &self,
220 mailbox_msg: MailboxMessage,
221 ) {
222 use protos::mailbox_server::mailbox_message::Message;
223
224 match mailbox_msg.message {
225 Some(Message::Arkoor(msg)) => {
226 let result = self
227 .process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
228 if let Err(e) = result {
229 error!("Error processing received arkoor package: {:#}", e);
230 }
231 }
232 Some(Message::RoundParticipationCompleted(m)) => {
233 info!("Server informed that round participation is ready, unlock_hash:{:?}",
236 UnlockHash::from_slice(&m.unlock_hash).ok(),
237 );
238 if let Err(e) = self.sync_pending_rounds().await {
239 error!("Error syncing pending rounds: {:#}", e);
240 }
241 },
242 Some(Message::IncomingLightningPayment(msg)) => {
243 if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
244 error!("Error handling lightning receive notification: {:#}", e);
245 }
246 },
247 Some(Message::RecoveryVtxoIds(_)) => {
248 trace!("Received recovery VTXO IDs, ignoring");
249 }
250 None => {
251 warn!("Received unknown mailbox message, ignoring");
252 }
253 }
254 }
255
256 async fn process_received_arkoor_package(
257 &self,
258 raw_vtxos: Vec<Vec<u8>>,
259 checkpoint: Option<u64>,
260 ) -> anyhow::Result<()> {
261 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
262
263 let mut new_vtxos = Vec::with_capacity(vtxos.len());
264 for vtxo in &vtxos {
265 if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
267 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
268 continue;
269 }
270
271 trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
272 new_vtxos.push(vtxo);
273 }
274
275 if new_vtxos.is_empty() {
276 return Ok(());
277 }
278
279 let balance = vtxos
280 .iter()
281 .map(|vtxo| vtxo.amount()).sum::<Amount>()
282 .to_signed()?;
283 self.store_spendable_vtxos(&vtxos).await?;
284
285 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
287 for vtxo in &vtxos {
288 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
289 if let Ok(address) = self.peek_address(index).await {
290 *received_by_address.entry(address).or_default() += vtxo.amount();
291 }
292 }
293 }
294 let received_on: Vec<_> = received_by_address
295 .iter()
296 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
297 .collect();
298
299 let movement_id = self.movements.new_finished_movement(
300 Subsystem::ARKOOR,
301 ArkoorMovement::Receive.to_string(),
302 MovementStatus::Successful,
303 MovementUpdate::new()
304 .produced_vtxos(&vtxos)
305 .intended_and_effective_balance(balance)
306 .received_on(received_on),
307 ).await?;
308
309 info!("Received arkoor (movement {}) for {}", movement_id, balance);
310
311 if let Some(checkpoint) = checkpoint {
312 self.store_mailbox_checkpoint(checkpoint).await?;
313 }
314
315 Ok(())
316 }
317
318 async fn handle_lightning_receive_notification(
323 &self,
324 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
325 checkpoint: u64,
326 ) -> anyhow::Result<()> {
327 let payment_hash = PaymentHash::try_from(notif.payment_hash)
328 .context("invalid payment hash in lightning receive notification")?;
329
330 debug!("Lightning receive notification: payment_hash={}", payment_hash);
331
332 match self.try_claim_lightning_receive(payment_hash, false, None).await {
333 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
334 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
335 }
336
337 self.store_mailbox_checkpoint(checkpoint).await?;
338 Ok(())
339 }
340
341 pub async fn post_recovery_vtxo_ids(
343 &self,
344 vtxo_ids: impl IntoIterator<Item = VtxoId>,
345 ) -> anyhow::Result<()> {
346 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
347 if vtxo_ids.is_empty() {
348 return Ok(());
349 }
350 let nb_vtxos = vtxo_ids.len();
351
352 let (mut srv, _) = self.require_server().await?;
353 let unblinded_id = self.recovery_mailbox_identifier().to_vec();
354 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
355
356 srv.mailbox_client.post_recovery_vtxo_ids(req).await
357 .context("error posting recovery vtxo IDs to server")?;
358
359 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
360 Ok(())
361 }
362
363 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
364 Ok(self.db.get_mailbox_checkpoint().await?)
365 }
366
367 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
368 Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
369 }
370}