bark-wallet 0.2.0

Wallet library and CLI for the bitcoin Ark protocol built by Second
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453

pub extern crate ark;

pub extern crate bip39;
pub extern crate lightning_invoice;
pub extern crate lnurl as lnurllib;

use std::collections::HashMap;

use anyhow::Context;
use ark::tree::signed::UnlockHash;
use bitcoin::hashes::Hash;
use bitcoin::Amount;
use bitcoin::hex::DisplayHex;
use bitcoin::secp256k1::Keypair;
use futures::{FutureExt, Stream, StreamExt};
use log::{debug, error, info, trace, warn};
use tokio_util::sync::CancellationToken;

use ark::{ProtocolEncoding, Vtxo, VtxoId};
use ark::lightning::{PaymentHash, Preimage};
use ark::mailbox::{MailboxAuthorization, MailboxIdentifier};
use ark::vtxo::Full;
use server_rpc::protos;
use server_rpc::protos::mailbox_server::MailboxMessage;

use crate::Wallet;
use crate::movement::{MovementDestination, MovementStatus};
use crate::movement::update::MovementUpdate;
use crate::subsystem::{ArkoorMovement, Subsystem};


/// The maximum number of times we will call the fetch mailbox endpoint in one go
///
/// We can't trust the server to honestly tell us to keep trying more forever.
/// A malicious server could send us empty messages or invalid messages and
/// lock up our resources forever. So we limit the number of times we will fetch.
/// If a user actually has more messages left, he will have to call sync again.
///
/// (Note that currently the server sends 100 messages per fetch, so this would
/// only happen for users with more than 1000 pending items.)
const MAX_MAILBOX_REQUEST_BURST: usize = 10;

impl Wallet {
	/// Get the keypair used for the server mailbox
	pub fn mailbox_keypair(&self) -> Keypair {
		self.inner.seed.to_mailbox_keypair()
	}

	/// Get the keypair used for the server recovery mailbox
	pub fn recovery_mailbox_keypair(&self) -> Keypair {
		self.inner.seed.to_recovery_mailbox_keypair()
	}

	/// Get this wallet's server mailbox ID
	pub fn mailbox_identifier(&self) -> MailboxIdentifier {
		let mailbox_kp = self.mailbox_keypair();
		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
	}

	/// Get this wallet's server recovery mailbox ID
	pub fn recovery_mailbox_identifier(&self) -> MailboxIdentifier {
		let mailbox_kp = self.recovery_mailbox_keypair();
		MailboxIdentifier::from_pubkey(mailbox_kp.public_key())
	}

	/// Create a mailbox authorization that is valid until the given expiry time
	///
	/// This authorization can be used by third parties to lookup your mailbox
	/// with the Ark server.
	pub fn mailbox_authorization(
		&self,
		authorization_expiry: chrono::DateTime<chrono::Local>,
	) -> MailboxAuthorization {
		MailboxAuthorization::new(&self.mailbox_keypair(), authorization_expiry)
	}

	/// Subscribe to mailbox message stream.
	///
	/// If `since` is `None`, the stream will start from the last checkpoint stored in the database.
	///
	/// Returns a stream of mailbox messages.
	pub async fn subscribe_mailbox_messages(
		&self,
		since_checkpoint: Option<u64>,
	) -> anyhow::Result<impl Stream<Item = anyhow::Result<MailboxMessage>> + Unpin> {
		let (mut srv, _) = self.require_server().await?;

		let checkpoint = if let Some(since) = since_checkpoint {
			since
		} else {
			self.get_mailbox_checkpoint().await?
		};

		// we just need a short authorization for the stream initialization
		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10);
		let auth = self.mailbox_authorization(expiry);
		let mailbox_id = auth.mailbox();

		let req = protos::mailbox_server::MailboxRequest {
			unblinded_id: mailbox_id.serialize(),
			authorization: Some(auth.serialize()),
			checkpoint: checkpoint,
		};

		let stream = srv.mailbox_client.subscribe_mailbox(req).await?.into_inner().map(|m| {
			let m = m.context("received error on mailbox message stream")?;
			Ok::<_, anyhow::Error>(m)
		});

		Ok(stream)
	}

	/// Similar to [Wallet::subscribe_mailbox_messages] but it will also process each mailbox
	/// message indefinitely. This method won't stop until the given `shutdown` `CancellationToken`
	/// is triggered.
	///
	/// If `since_checkpoint` is `None`, the stream will start from the last checkpoint stored in
	/// the database.
	///
	/// Returns only once the stream is closed.
	pub async fn subscribe_process_mailbox_messages(
		&self,
		since_checkpoint: Option<u64>,
		shutdown: CancellationToken,
	) -> anyhow::Result<()> {
		let mut reconnect_count = 0;
		const MAX_RECONNECT_ATTEMPTS: usize = 5;

		'outer: loop {
			let mut stream = self.subscribe_mailbox_messages(since_checkpoint).await?;
			let connected_at = std::time::Instant::now();
			trace!("Connected to mailbox stream with server");

			loop {
				futures::select! {
					message = stream.next().fuse() => {
						if let Some(message) = message {
							reconnect_count = 0;
							let message = message.context("error on mailbox message stream")?;
							self.process_mailbox_message(message).await;
						} else if connected_at.elapsed() >= crate::HEALTHY_STREAM_DURATION {
							// Stream lived long enough that this is likely a
							// normal idle timeout, not a misbehaving server.
							reconnect_count = 0;
							info!("Mailbox stream closed after healthy session, reconnecting");
							continue 'outer;
						} else if reconnect_count >= MAX_RECONNECT_ATTEMPTS {
							bail!("Mailbox stream dropped by server, giving up to retry later");
						} else {
							reconnect_count += 1;
							warn!("Mailbox stream dropped by server, reconnecting");
							continue 'outer;
						}
					},
					_ = shutdown.cancelled().fuse() => {
						info!("Shutdown signal received! Shutting mailbox messages process...");
						return Ok(());
					},
				}
			}
		}
	}

	/// Sync with the mailbox on the Ark server and look for out-of-round received VTXOs.
	pub async fn sync_mailbox(&self) -> anyhow::Result<()> {
		let (mut srv, _) = self.require_server().await?;

		// we should be able to do all our syncing in 10 minutes
		let expiry = chrono::Local::now() + std::time::Duration::from_secs(10 * 60);
		let auth = self.mailbox_authorization(expiry);
		let mailbox_id = auth.mailbox();

		for _ in 0..MAX_MAILBOX_REQUEST_BURST {
			let checkpoint = self.get_mailbox_checkpoint().await?;
			let mailbox_req = protos::mailbox_server::MailboxRequest {
				unblinded_id: mailbox_id.serialize(),
				authorization: Some(auth.serialize()),
				checkpoint,
			};

			let mailbox_resp = srv.mailbox_client.read_mailbox(mailbox_req).await
				.context("error fetching mailbox")?.into_inner();
			debug!("Ark server has {} mailbox messages for us", mailbox_resp.messages.len());

			for mailbox_msg in mailbox_resp.messages {
				self.process_mailbox_message(mailbox_msg).await;
			}

			if !mailbox_resp.have_more {
				break;
			}
		}

		Ok(())
	}

	/// Turn raw byte arrays into VTXOs, then validate them.
	///
	/// This function doesn't return a result on purpose,
	/// because we want to make sure we don't early return on
	/// the first error. This ensure we process all VTXOs, even
	/// if some are invalid, and print everything we received.
	async fn process_raw_vtxos(
		&self,
		raw_vtxos: Vec<Vec<u8>>,
	) -> Vec<Vtxo<Full>> {
		let mut invalid_vtxos = Vec::with_capacity(raw_vtxos.len());
		let mut valid_vtxos = Vec::with_capacity(raw_vtxos.len());

		for bytes in &raw_vtxos {
			let vtxo = match Vtxo::<Full>::deserialize(&bytes) {
				Ok(vtxo) => vtxo,
				Err(e) => {
					error!("Failed to deserialize arkoor VTXO: {}: {}", bytes.as_hex(), e);
					invalid_vtxos.push(bytes);
					continue;
				}
			};

			if let Err(e) = self.validate_vtxo(&vtxo).await {
				error!("Received invalid arkoor VTXO {} from server: {}", vtxo.id(), e);
				invalid_vtxos.push(bytes);
				continue;
			}

			valid_vtxos.push(vtxo);
		}

		// We log all invalid VTXOs to keep track
		if !invalid_vtxos.is_empty() {
			error!("Received {} invalid arkoor VTXOs out of {} from server", invalid_vtxos.len(), raw_vtxos.len());
		}

		valid_vtxos
	}

	pub(crate) async fn process_mailbox_message(
		&self,
		mailbox_msg: MailboxMessage,
	) {
		use protos::mailbox_server::mailbox_message::Message;

		// Each arm returns whether the checkpoint should advance. Only
		// arkoor returns false on processing error so the server
		// redelivers and we retry. Every other arm advances regardless,
		// either because the work is idempotent and re-done on every
		// wallet sync, or because the message is informational/ignored.
		let advance = match mailbox_msg.message {
			Some(Message::Arkoor(msg)) => {
				match self.process_received_arkoor_package(msg.vtxos).await {
					Ok(()) => true,
					Err(e) => {
						error!("Error processing received arkoor package: {:#}", e);
						false
					}
				}
			}
			Some(Message::RoundParticipationCompleted(m)) => {
				info!("Server informed that round participation is ready, unlock_hash:{:?}",
					UnlockHash::from_slice(&m.unlock_hash).ok(),
				);
				if let Err(e) = self.sync_pending_rounds().await {
					error!("Error syncing pending rounds: {:#}", e);
				}
				true
			},
			Some(Message::IncomingLightningPayment(msg)) => {
				if let Err(e) = self.handle_lightning_receive_notification(msg).await {
					error!("Error handling lightning receive notification: {:#}", e);
				}
				true
			},
			Some(Message::RecoveryVtxoIds(_)) => {
				trace!("Received recovery VTXO IDs, ignoring");
				true
			}
			Some(Message::LightningSendFinished(msg)) => {
				if let Err(e) = self.handle_lightning_send_finished(msg, mailbox_msg.checkpoint).await {
					error!("Error handling lightning send finished notification: {:#}", e);
				}
				true
			}
			None => {
				warn!("Received unknown mailbox message kind at checkpoint {}; bark may need to be upgraded",
					mailbox_msg.checkpoint);
				true
			}
		};

		if advance {
			if let Err(e) = self.store_mailbox_checkpoint(mailbox_msg.checkpoint).await {
				error!("Error storing mailbox checkpoint: {:#}", e);
			}
		}
	}

	async fn process_received_arkoor_package(
		&self,
		raw_vtxos: Vec<Vec<u8>>,
	) -> anyhow::Result<()> {
		let vtxos = self.process_raw_vtxos(raw_vtxos).await;

		let mut new_vtxos = Vec::with_capacity(vtxos.len());
		for vtxo in &vtxos {
			// Skip if already in wallet
			if self.inner.db.get_wallet_vtxo(vtxo.id()).await?.is_some() {
				debug!("Ignoring duplicate arkoor VTXO {}", vtxo.id());
				continue;
			}

			trace!("Received arkoor VTXO {} for {}", vtxo.id(), vtxo.amount());
			new_vtxos.push(vtxo);
		}

		if new_vtxos.is_empty() {
			return Ok(());
		}

		// Redundantly re-register the received vtxos with the server. An
		// up-to-date sender already does this after cosign, but older
		// senders may not, so we do it on receive too to make sure the
		// server has signed_tx rows for our spendable vtxos. Any failure
		// is logged and swallowed: the receive must still proceed so we
		// don't lose track of the vtxos locally, and later spends will
		// retry registration if still needed.
		if let Err(e) = self.register_vtxo_transactions_with_server(&new_vtxos).await {
			warn!("Failed to register received arkoor vtxo transactions with server: {:#}", e);
		}

		let balance = vtxos
			.iter()
			.map(|vtxo| vtxo.amount()).sum::<Amount>()
			.to_signed()?;
		self.store_spendable_vtxos(&vtxos).await?;

		// Build received_on destinations from received VTXOs, aggregated by address
		let mut received_by_address = HashMap::<ark::Address, Amount>::new();
		for vtxo in &vtxos {
			if let Ok(Some((index, _))) = self.pubkey_keypair(&vtxo.user_pubkey()).await {
				if let Ok(address) = self.peek_address(index).await {
					*received_by_address.entry(address).or_default() += vtxo.amount();
				}
			}
		}
		let received_on: Vec<_> = received_by_address
			.iter()
			.map(|(addr, amount)| MovementDestination::ark(addr.clone(), *amount))
			.collect();

		let movement_id = self.inner.movements.new_finished_movement(
			Subsystem::ARKOOR,
			ArkoorMovement::Receive.to_string(),
			MovementStatus::Successful,
			MovementUpdate::new()
				.produced_vtxos(&vtxos)
				.intended_and_effective_balance(balance)
				.received_on(received_on),
		).await?;

		info!("Received arkoor (movement {}) for {}", movement_id, balance);

		Ok(())
	}

	/// Handle a lightning receive notification from the mailbox.
	///
	/// This is a signal that the server has received a lightning payment for us
	/// and we should come online to claim it.
	async fn handle_lightning_receive_notification(
		&self,
		notif: protos::mailbox_server::IncomingLightningPaymentMessage,
	) -> anyhow::Result<()> {
		let payment_hash = PaymentHash::try_from(notif.payment_hash)
			.context("invalid payment hash in lightning receive notification")?;

		debug!("Lightning receive notification: payment_hash={}", payment_hash);

		match self.try_claim_lightning_receive(payment_hash, false, None).await {
			Ok(_) => info!("Lightning receive claimed via mailbox notification for {}", payment_hash),
			Err(e) => error!("Failed to claim lightning receive for {}: {:#}", payment_hash, e),
		}

		Ok(())
	}

	/// Handle a lightning send finished notification from the mailbox.
	///
	/// This notification indicates that the server has completed processing
	/// a lightning payment we initiated, either successfully or with failure.
	async fn handle_lightning_send_finished(
		&self,
		notif: protos::mailbox_server::LightningSendFinishedMessage,
		checkpoint: u64,
	) -> anyhow::Result<()> {
		let payment_hash = PaymentHash::try_from(notif.payment_hash)
			.context("invalid payment hash in lightning send finished notification")?;

		let known_preimage = notif.preimage
			.and_then(|bytes| Preimage::try_from(bytes).ok());

		if known_preimage.is_some() {
			debug!("Lightning send finished notification (success): payment_hash={}", payment_hash);
		} else {
			debug!("Lightning send finished notification (failed): payment_hash={}", payment_hash);
		}

		// Trigger the payment check flow which will handle success/revocation.
		// When we already have the preimage, this skips the server RPC.
		// Errors are logged but not propagated: we always advance the checkpoint
		// to avoid re-processing the same notification on the next poll.
		match self.check_lightning_payment_with_preimage(payment_hash, known_preimage).await {
			Ok(_) => info!("Processed lightning send finished for {}", payment_hash),
			Err(e) => error!("Failed to process lightning send finished for {}: {:#}", payment_hash, e),
		}

		self.store_mailbox_checkpoint(checkpoint).await?;
		Ok(())
	}

	/// Post vtxo IDs to the server's recovery mailbox
	pub async fn post_recovery_vtxo_ids(
		&self,
		vtxo_ids: impl IntoIterator<Item = VtxoId>,
	) -> anyhow::Result<()> {
		let vtxo_ids = vtxo_ids.into_iter().map(|id| id.to_bytes().to_vec()).collect::<Vec<_>>();
		if vtxo_ids.is_empty() {
			return Ok(());
		}
		let nb_vtxos = vtxo_ids.len();

		let (mut srv, _) = self.require_server().await?;
		let unblinded_id = self.recovery_mailbox_identifier().serialize();
		let req = protos::mailbox_server::PostRecoveryVtxoIdsRequest { unblinded_id, vtxo_ids };

		srv.mailbox_client.post_recovery_vtxo_ids(req).await
			.context("error posting recovery vtxo IDs to server")?;

		debug!("Posted {} recovery vtxo IDs to server", nb_vtxos);
		Ok(())
	}

	/// Return the stored mailbox checkpoint — the tip position the wallet
	/// has consumed up to. After a successful [`Self::sync_mailbox`], this value
	/// reflects the server's latest advertised tip.
	pub async fn get_mailbox_checkpoint(&self) -> anyhow::Result<u64> {
		Ok(self.inner.db.get_mailbox_checkpoint().await?)
	}

	async fn store_mailbox_checkpoint(&self, checkpoint: u64) -> anyhow::Result<()> {
		Ok(self.inner.db.store_mailbox_checkpoint(checkpoint).await?)
	}
}