1
2pub extern crate ark;
3
4pub extern crate bip39;
5pub extern crate lightning_invoice;
6pub extern crate lnurl as lnurllib;
7
8use std::collections::HashMap;
9
10use anyhow::Context;
11use bitcoin::Amount;
12use bitcoin::hex::DisplayHex;
13use bitcoin::secp256k1::Keypair;
14use futures::{Stream, StreamExt};
15use log::{debug, error, info, trace, warn};
16
17use ark::{ProtocolEncoding, Vtxo, VtxoId};
18use ark::lightning::PaymentHash;
19use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
20use ark::vtxo::Full;
21use server_rpc::protos;
22use server_rpc::protos::mailbox_server::MailboxMessage;
23
24use crate::Wallet;
25use crate::movement::{MovementDestination, MovementStatus};
26use crate::movement::update::MovementUpdate;
27use crate::subsystem::{ArkoorMovement, Subsystem};
28
29
30const MAX_MAILBOX_REQUEST_BURST: usize = 10;
40
41impl Wallet {
42 pub fn mailbox_keypair(&self) -> Keypair {
44 self.seed.to_mailbox_keypair()
45 }
46
47 pub fn recovery_mailbox_keypair(&self) -> Keypair {
49 self.seed.to_recovery_mailbox_keypair()
50 }
51
52 pub fn mailbox_identifier(&self) -> MailboxIdentifier {
54 let mailbox_kp = self.mailbox_keypair();
55 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
56 }
57
58 pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
60 let mailbox_kp = self.recovery_mailbox_keypair();
61 MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
62 }
63
64 pub fn mailbox_authorization(
69 &self,
70 authorization_expiry: chrono::DateTime<chrono::Local>,
71 ) -> MailboxAuthorization {
72 MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
73 }
74
75 pub async fn subscribe_mailbox_messages(
81 &self,
82 since_checkpoint: Option<u64>,
83 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
84 let (mut srv, _) = self.require_server().await?;
85
86 let checkpoint = if let Some(since) = since_checkpoint {
87 since
88 } else {
89 self.get_mailbox_checkpoint().await?
90 };
91
92 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
94 let auth = self.mailbox_authorization(expiry);
95 let mailbox_id = auth.mailbox();
96
97 let req = protos::mailbox_server::MailboxRequest {
98 unblinded_id: mailbox_id.to_vec(),
99 authorization: Some(auth.serialize()),
100 checkpoint: checkpoint,
101 };
102
103 let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
104 let m = m.context("received error on mailbox message stream")?;
105 Ok::<_, anyhow::Error>(m)
106 });
107
108 Ok(stream)
109 }
110
111 pub async fn subscribe_process_mailbox_messages(
117 &self,
118 since_checkpoint: Option<u64>,
119 ) -> anyhow::Result<()> {
120 let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
121 while let Some(message) = stream.next().await {
122 let message = if let Ok(message) = message {
123 message
124 } else {
125 error!("Error receiving mailbox message: {:#}", message.unwrap_err());
126 continue;
127 };
128
129 self.process_mailbox_message(message).await;
130 }
131
132 Ok(())
133 }
134
135 pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
137 let (mut srv, _) = self.require_server().await?;
138
139 let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
141 let auth = self.mailbox_authorization(expiry);
142 let mailbox_id = auth.mailbox();
143
144 for _ in 0..MAX_MAILBOX_REQUEST_BURST {
145 let checkpoint = self.get_mailbox_checkpoint().await?;
146 let mailbox_req = protos::mailbox_server::MailboxRequest {
147 unblinded_id: mailbox_id.to_vec(),
148 authorization: Some(auth.serialize()),
149 checkpoint,
150 };
151
152 let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
153 .context("error fetching mailbox")?.into_inner();
154 debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());
155
156 for mailbox_msg in mailbox_resp.messages {
157 self.process_mailbox_message(mailbox_msg).await;
158 }
159
160 if !mailbox_resp.have_more {
161 break;
162 }
163 }
164
165 Ok(())
166 }
167
168 async fn process_raw_vtxos(
175 &self,
176 raw_vtxos: Vec<Vec<u8>>,
177 ) -> Vec<Vtxo<Full>> {
178 let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
179 let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());
180
181 for bytes in &raw_vtxos {
182 let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
183 Ok(vtxo) => vtxo,
184 Err(e) => {
185 error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
186 invalid_vtxos.push(bytes);
187 continue;
188 }
189 };
190
191 if let Err(e) = self.validate_vtxo(&vtxo).await {
192 error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
193 invalid_vtxos.push(bytes);
194 continue;
195 }
196
197 valid_vtxos.push(vtxo);
198 }
199
200 if !invalid_vtxos.is_empty() {
202 error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
203 }
204
205 valid_vtxos
206 }
207
208 async fn process_mailbox_message(
209 &self,
210 mailbox_msg: MailboxMessage,
211 ) {
212 match mailbox_msg.message {
213 Some(protos::mailbox_server::mailbox_message::Message::Arkoor(msg)) => {
214 let result = self
215 .process_received_arkoor_package(msg.vtxos, Some(mailbox_msg.checkpoint)).await;
216 if let Err(e) = result {
217 error!("Error processing received arkoor package: {:#}", e);
218 }
219 }
220 Some(protos::mailbox_server::mailbox_message::Message::RoundParticipationCompleted(_)) => {
221 if let Err(e) = self.sync_pending_rounds().await {
224 error!("Error syncing pending rounds: {:#}", e);
225 }
226 },
227 Some(protos::mailbox_server::mailbox_message::Message::IncomingLightningPayment(msg)) => {
228 if let Err(e) = self.handle_lightning_receive_notification(msg, mailbox_msg.checkpoint).await {
229 error!("Error handling lightning receive notification: {:#}", e);
230 }
231 },
232 Some(protos::mailbox_server::mailbox_message::Message::RecoveryVtxoIds(_)) => {
233 trace!("Received recovery VTXO IDs, ignoring");
234 }
235 None => {
236 warn!("Received unknown mailbox message, ignoring");
237 }
238 }
239 }
240
241 async fn process_received_arkoor_package(
242 &self,
243 raw_vtxos: Vec<Vec<u8>>,
244 checkpoint: Option<u64>,
245 ) -> anyhow::Result<()> {
246 let vtxos = self.process_raw_vtxos(raw_vtxos).await;
247
248 let mut new_vtxos = Vec::with_capacity(vtxos.len());
249 for vtxo in &vtxos {
250 if self.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
252 debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
253 continue;
254 }
255
256 trace!("Received arkoor VTXO {} for {} (checkpoint {:?})", vtxo.id(), vtxo.amount(), checkpoint);
257 new_vtxos.push(vtxo);
258 }
259
260 if new_vtxos.is_empty() {
261 return Ok(());
262 }
263
264 let balance = vtxos
265 .iter()
266 .map(|vtxo| vtxo.amount()).sum::<Amount>()
267 .to_signed()?;
268 self.store_spendable_vtxos(&vtxos).await?;
269
270 let mut received_by_address = HashMap::<ark::Address, Amount>::new();
272 for vtxo in &vtxos {
273 if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
274 if let Ok(address) = self.peek_address(index).await {
275 *received_by_address.entry(address).or_default() += vtxo.amount();
276 }
277 }
278 }
279 let received_on: Vec<_> = received_by_address
280 .iter()
281 .map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
282 .collect();
283
284 let movement_id = self.movements.new_finished_movement(
285 Subsystem::ARKOOR,
286 ArkoorMovement::Receive.to_string(),
287 MovementStatus::Successful,
288 MovementUpdate::new()
289 .produced_vtxos(&vtxos)
290 .intended_and_effective_balance(balance)
291 .received_on(received_on),
292 ).await?;
293
294 info!("Received arkoor (movement {}) for {}", movement_id, balance);
295
296 if let Some(checkpoint) = checkpoint {
297 self.store_mailbox_checkpoint(checkpoint).await?;
298 }
299
300 Ok(())
301 }
302
303 async fn handle_lightning_receive_notification(
308 &self,
309 notif: protos::mailbox_server::IncomingLightningPaymentMessage,
310 checkpoint: u64,
311 ) -> anyhow::Result<()> {
312 let payment_hash = PaymentHash::try_from(notif.payment_hash)
313 .context("invalid payment hash in lightning receive notification")?;
314
315 debug!("Lightning receive notification: payment_hash={}", payment_hash);
316
317 match self.try_claim_lightning_receive(payment_hash, false, None).await {
318 Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
319 Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
320 }
321
322 self.store_mailbox_checkpoint(checkpoint).await?;
323 Ok(())
324 }
325
326 pub async fn post_recovery_vtxo_ids(
328 &self,
329 vtxo_ids: impl IntoIterator<Item = VtxoId>,
330 ) -> anyhow::Result<()> {
331 let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
332 if vtxo_ids.is_empty() {
333 return Ok(());
334 }
335 let nb_vtxos = vtxo_ids.len();
336
337 let (mut srv, _) = self.require_server().await?;
338 let unblinded_id = self.recovery_mailbox_identifier().to_vec();
339 let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };
340
341 srv.mailbox_client.post_recovery_vtxo_ids(req).await
342 .context("error posting recovery vtxo IDs to server")?;
343
344 debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
345 Ok(())
346 }
347
348 async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
349 Ok(self.db.get_mailbox_checkpoint().await?)
350 }
351
352 async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
353 Ok(self.db.store_mailbox_checkpoint(checkpoint).await?)
354 }
355}