1#![deny(clippy::pedantic)]
2#![allow(clippy::enum_variant_names)]
3#![allow(clippy::missing_errors_doc)]
4#![allow(clippy::missing_panics_doc)]
5#![allow(clippy::module_name_repetitions)]
6#![allow(clippy::must_use_candidate)]
7
8mod api;
9#[cfg(feature = "cli")]
10mod cli;
11mod db;
12mod remote_receive_sm;
13
14use std::collections::BTreeMap;
15use std::sync::Arc;
16
17use async_stream::stream;
18use bitcoin::hashes::{Hash, sha256};
19use bitcoin::secp256k1;
20use db::{
21 ClaimedContractKey, FundedContractKey, FundedContractKeyPrefix, UnfundedContractInfo,
22 UnfundedContractKey,
23};
24use fedimint_api_client::api::DynModuleApi;
25use fedimint_client_module::module::init::{ClientModuleInit, ClientModuleInitArgs};
26use fedimint_client_module::module::recovery::NoModuleBackup;
27use fedimint_client_module::module::{ClientContext, ClientModule};
28use fedimint_client_module::sm::{Context, DynState, ModuleNotifier, State, StateTransition};
29use fedimint_client_module::transaction::{ClientInput, ClientInputBundle};
30use fedimint_client_module::{DynGlobalClientContext, sm_enum_variant_translation};
31use fedimint_core::config::FederationId;
32use fedimint_core::core::{Decoder, IntoDynInstance, ModuleInstanceId, ModuleKind, OperationId};
33use fedimint_core::db::{DatabaseTransaction, IDatabaseTransactionOpsCoreTyped};
34use fedimint_core::encoding::{Decodable, Encodable};
35use fedimint_core::module::{
36 ApiAuth, ApiVersion, CommonModuleInit, ModuleCommon, ModuleConsensusVersion, ModuleInit,
37 MultiApiVersion,
38};
39use fedimint_core::time::duration_since_epoch;
40use fedimint_core::util::SafeUrl;
41use fedimint_core::{Amount, OutPoint, apply, async_trait_maybe_send};
42use fedimint_lnv2_common::config::LightningClientConfig;
43use fedimint_lnv2_common::contracts::{IncomingContract, PaymentImage};
44use fedimint_lnv2_common::gateway_api::{
45 GatewayConnection, GatewayConnectionError, PaymentFee, RealGatewayConnection, RoutingInfo,
46};
47use fedimint_lnv2_common::{
48 Bolt11InvoiceDescription, ContractId, LightningInput, LightningInputV0, LightningModuleTypes,
49 MODULE_CONSENSUS_VERSION,
50};
51use futures::StreamExt;
52use lightning_invoice::Bolt11Invoice;
53use rand::rng;
54use rand::seq::SliceRandom;
55use secp256k1::{Keypair, PublicKey, Scalar, ecdh};
56use serde::{Deserialize, Serialize};
57use thiserror::Error;
58use tpe::{AggregateDecryptionKey, derive_agg_dk};
59
60use crate::api::LightningFederationApi;
61use crate::remote_receive_sm::{
62 RemoteReceiveSMCommon, RemoteReceiveSMState, RemoteReceiveStateMachine,
63};
64
65const KIND: ModuleKind = ModuleKind::from_static_str("lnv2");
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct ClaimableContract {
69 pub contract: IncomingContract,
70 pub outpoint: OutPoint,
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct OperationMeta {
75 pub contract: IncomingContract,
76}
77
78#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
80pub enum FinalRemoteReceiveOperationState {
81 Funded,
83 Expired,
85}
86
87#[derive(Debug)]
88pub struct LightningRemoteCommonInit;
89
90impl CommonModuleInit for LightningRemoteCommonInit {
91 const CONSENSUS_VERSION: ModuleConsensusVersion = MODULE_CONSENSUS_VERSION;
92 const KIND: ModuleKind = KIND;
93
94 type ClientConfig = LightningClientConfig;
95
96 fn decoder() -> Decoder {
97 LightningModuleTypes::decoder()
98 }
99}
100
101#[derive(Debug, Clone)]
102pub struct LightningRemoteClientInit {
103 pub gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
104}
105
106impl Default for LightningRemoteClientInit {
107 fn default() -> Self {
108 LightningRemoteClientInit {
109 gateway_conn: Arc::new(RealGatewayConnection),
110 }
111 }
112}
113
114impl ModuleInit for LightningRemoteClientInit {
115 type Common = LightningRemoteCommonInit;
116
117 async fn dump_database(
118 &self,
119 _dbtx: &mut DatabaseTransaction<'_>,
120 _prefix_names: Vec<String>,
121 ) -> Box<dyn Iterator<Item = (String, Box<dyn erased_serde::Serialize + Send>)> + '_> {
122 Box::new(BTreeMap::new().into_iter())
123 }
124}
125
126#[apply(async_trait_maybe_send!)]
127impl ClientModuleInit for LightningRemoteClientInit {
128 type Module = LightningClientModule;
129
130 fn supported_api_versions(&self) -> MultiApiVersion {
131 MultiApiVersion::try_from_iter([ApiVersion { major: 0, minor: 0 }])
132 .expect("no version conflicts")
133 }
134
135 async fn init(&self, args: &ClientModuleInitArgs<Self>) -> anyhow::Result<Self::Module> {
136 Ok(LightningClientModule::new(
137 *args.federation_id(),
138 args.cfg().clone(),
139 args.notifier().clone(),
140 args.context(),
141 args.module_api().clone(),
142 args.module_root_secret()
143 .clone()
144 .to_secp_key(fedimint_core::secp256k1::SECP256K1),
145 self.gateway_conn.clone(),
146 args.admin_auth().cloned(),
147 ))
148 }
149}
150
151#[derive(Debug, Clone)]
152pub struct LightningClientContext {}
153
154impl Context for LightningClientContext {
155 const KIND: Option<ModuleKind> = Some(KIND);
156}
157
158#[derive(Debug)]
159pub struct LightningClientModule {
160 federation_id: FederationId,
161 cfg: LightningClientConfig,
162 notifier: ModuleNotifier<LightningClientStateMachines>,
163 client_ctx: ClientContext<Self>,
164 module_api: DynModuleApi,
165 keypair: Keypair,
166 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
167 #[allow(unused)] admin_auth: Option<ApiAuth>,
169}
170
171#[apply(async_trait_maybe_send!)]
172impl ClientModule for LightningClientModule {
173 type Init = LightningRemoteClientInit;
174 type Common = LightningModuleTypes;
175 type Backup = NoModuleBackup;
176 type ModuleStateMachineContext = LightningClientContext;
177 type States = LightningClientStateMachines;
178
179 fn context(&self) -> Self::ModuleStateMachineContext {
180 LightningClientContext {}
181 }
182
183 fn input_fee(
184 &self,
185 amount: Amount,
186 _input: &<Self::Common as ModuleCommon>::Input,
187 ) -> Option<Amount> {
188 Some(self.cfg.fee_consensus.fee(amount))
189 }
190
191 fn output_fee(
192 &self,
193 amount: Amount,
194 _output: &<Self::Common as ModuleCommon>::Output,
195 ) -> Option<Amount> {
196 Some(self.cfg.fee_consensus.fee(amount))
197 }
198
199 #[cfg(feature = "cli")]
200 async fn handle_cli_command(
201 &self,
202 args: &[std::ffi::OsString],
203 ) -> anyhow::Result<serde_json::Value> {
204 cli::handle_cli_command(self, args).await
205 }
206}
207
208fn generate_ephemeral_tweak(static_pk: PublicKey) -> ([u8; 32], PublicKey) {
209 let keypair = Keypair::new(secp256k1::SECP256K1, &mut secp256k1::rand::thread_rng());
211
212 let tweak = ecdh::SharedSecret::new(&static_pk, &keypair.secret_key());
213
214 (tweak.secret_bytes(), keypair.public_key())
215}
216
217impl LightningClientModule {
218 #[allow(clippy::too_many_arguments)]
219 fn new(
220 federation_id: FederationId,
221 cfg: LightningClientConfig,
222 notifier: ModuleNotifier<LightningClientStateMachines>,
223 client_ctx: ClientContext<Self>,
224 module_api: DynModuleApi,
225 keypair: Keypair,
226 gateway_conn: Arc<dyn GatewayConnection + Send + Sync>,
227 admin_auth: Option<ApiAuth>,
228 ) -> Self {
229 Self {
230 federation_id,
231 cfg,
232 notifier,
233 client_ctx,
234 module_api,
235 keypair,
236 gateway_conn,
237 admin_auth,
238 }
239 }
240
241 async fn get_random_gateway(&self) -> Result<(SafeUrl, RoutingInfo), SelectGatewayError> {
242 let mut gateways = self
243 .module_api
244 .gateways()
245 .await
246 .map_err(|e| SelectGatewayError::FederationError(e.to_string()))?;
247
248 if gateways.is_empty() {
249 return Err(SelectGatewayError::NoVettedGateways);
250 }
251
252 gateways.shuffle(&mut rng());
253
254 for gateway in gateways {
255 if let Ok(Some(routing_info)) = self.routing_info(&gateway).await {
256 return Ok((gateway, routing_info));
257 }
258 }
259
260 Err(SelectGatewayError::FailedToFetchRoutingInfo)
261 }
262
263 async fn routing_info(
264 &self,
265 gateway: &SafeUrl,
266 ) -> Result<Option<RoutingInfo>, GatewayConnectionError> {
267 self.gateway_conn
268 .routing_info(gateway.clone(), &self.federation_id)
269 .await
270 }
271
272 pub fn get_public_key(&self) -> PublicKey {
273 self.keypair.public_key()
274 }
275
276 pub async fn remote_receive(
288 &self,
289 claimer_pk: PublicKey,
290 amount: Amount,
291 expiry_secs: u32,
292 description: Bolt11InvoiceDescription,
293 gateway: Option<SafeUrl>,
294 ) -> Result<(Bolt11Invoice, OperationId), RemoteReceiveError> {
295 let (invoice, contract) = self
296 .create_contract_and_fetch_invoice(
297 claimer_pk,
298 amount,
299 expiry_secs,
300 description,
301 gateway,
302 )
303 .await?;
304
305 let operation_id = self
306 .start_remote_receive_state_machine(contract.clone(), claimer_pk)
307 .await;
308
309 self.client_ctx
310 .module_db()
311 .autocommit(
312 |dbtx, _| {
313 Box::pin(async {
314 dbtx.insert_new_entry(
315 &UnfundedContractKey(contract.contract_id()),
316 &UnfundedContractInfo {
317 contract: contract.clone(),
318 claimer_pk,
319 },
320 )
321 .await;
322
323 Ok::<(), ()>(())
324 })
325 },
326 None,
327 )
328 .await
329 .expect("Autocommit has no retry limit");
330
331 Ok((invoice, operation_id))
332 }
333
334 pub async fn await_remote_receive(
338 &self,
339 operation_id: OperationId,
340 ) -> anyhow::Result<FinalRemoteReceiveOperationState> {
341 let operation = self.client_ctx.get_operation(operation_id).await?;
342 let mut stream = self.notifier.subscribe(operation_id).await;
343
344 Ok(self.client_ctx.outcome_or_updates(operation, operation_id, || {
347 stream! {
348 loop {
349 if let Some(LightningClientStateMachines::RemoteReceive(state)) = stream.next().await {
350 match state.state {
351 RemoteReceiveSMState::Pending => {},
354 RemoteReceiveSMState::Funded => {
355 yield FinalRemoteReceiveOperationState::Funded;
356 return;
357 },
358 RemoteReceiveSMState::Expired => {
359 yield FinalRemoteReceiveOperationState::Expired;
360 return;
361 },
362 }
363 }
364 }
365 }
366 }).into_stream().next().await.expect("Stream contains one final state"))
367 }
368
369 pub async fn get_claimable_contracts(
371 &self,
372 claimer_pk: PublicKey,
373 limit_or: Option<usize>,
374 ) -> Vec<ClaimableContract> {
375 let mut dbtx = self.client_ctx.module_db().begin_transaction_nc().await;
376
377 let contract_stream = dbtx
378 .find_by_prefix(&FundedContractKeyPrefix)
379 .await
380 .filter_map(|c| async move {
381 if c.1.claimer_pk == claimer_pk {
382 Some(ClaimableContract {
383 contract: c.1.contract,
384 outpoint: c.1.outpoint,
385 })
386 } else {
387 None
388 }
389 });
390
391 if let Some(limit) = limit_or {
392 contract_stream.take(limit).collect::<Vec<_>>().await
393 } else {
394 contract_stream.collect::<Vec<_>>().await
395 }
396 }
397
398 pub async fn remove_claimed_contracts(&self, contract_ids: Vec<ContractId>) {
402 self.client_ctx
403 .module_db()
404 .autocommit(
405 |dbtx, _| {
406 Box::pin(async {
407 for contract_id in &contract_ids {
408 debug_assert!(
409 dbtx.get_value(&UnfundedContractKey(*contract_id))
410 .await
411 .is_none(),
412 "Should never have access to IDs of unclaimed contracts"
413 );
414
415 dbtx.remove_entry(&FundedContractKey(*contract_id)).await;
416 }
417
418 Ok::<(), ()>(())
419 })
420 },
421 None,
422 )
423 .await
424 .expect("Autocommit has no retry limit");
425 }
426
427 pub async fn claim_contracts(
428 &self,
429 claimable_contracts: Vec<ClaimableContract>,
430 ) -> anyhow::Result<()> {
431 let operation_id = OperationId::from_encodable(
432 &claimable_contracts
433 .iter()
434 .map(|c| c.contract.clone())
435 .collect::<Vec<_>>(),
436 );
437
438 let mut dbtx = self.client_ctx.module_db().begin_transaction().await;
439
440 let mut client_inputs = Vec::new();
441
442 for claimable_contract in claimable_contracts {
443 let key = ClaimedContractKey(claimable_contract.contract.contract_id());
444
445 let contract_already_claimed = dbtx.get_value(&key).await.is_some();
446
447 if !contract_already_claimed {
448 dbtx.insert_new_entry(&key, &()).await;
449
450 let (claim_keypair, agg_decryption_key) = self
452 .recover_contract_keys(&claimable_contract.contract)
453 .unwrap();
454
455 client_inputs.push(ClientInput::<LightningInput> {
456 input: LightningInput::V0(LightningInputV0::Incoming(
457 claimable_contract.outpoint,
458 agg_decryption_key,
459 )),
460 amount: claimable_contract.contract.commitment.amount,
461 keys: vec![claim_keypair],
462 });
463 }
464 }
465
466 if client_inputs.is_empty() {
467 return Ok(());
468 }
469
470 let change_range = self
471 .client_ctx
472 .claim_inputs(
473 &mut dbtx.to_ref_nc(),
474 ClientInputBundle::new_no_sm(client_inputs),
475 operation_id,
476 )
477 .await
478 .expect("Cannot claim input, additional funding needed");
479
480 dbtx.commit_tx_result().await?;
481
482 self.client_ctx
488 .await_primary_module_outputs(operation_id, change_range.into_iter().collect())
489 .await?;
490
491 Ok(())
492 }
493
494 async fn create_contract_and_fetch_invoice(
497 &self,
498 claimer_pk: PublicKey,
499 amount: Amount,
500 expiry_secs: u32,
501 description: Bolt11InvoiceDescription,
502 gateway: Option<SafeUrl>,
503 ) -> Result<(Bolt11Invoice, IncomingContract), RemoteReceiveError> {
504 let (ephemeral_tweak, ephemeral_pk) = generate_ephemeral_tweak(claimer_pk);
505
506 let encryption_seed = ephemeral_tweak
507 .consensus_hash::<sha256::Hash>()
508 .to_byte_array();
509
510 let preimage = encryption_seed
511 .consensus_hash::<sha256::Hash>()
512 .to_byte_array();
513
514 let (gateway, routing_info) = match gateway {
515 Some(gateway) => (
516 gateway.clone(),
517 self.routing_info(&gateway)
518 .await
519 .map_err(RemoteReceiveError::GatewayConnectionError)?
520 .ok_or(RemoteReceiveError::UnknownFederation)?,
521 ),
522 None => self
523 .get_random_gateway()
524 .await
525 .map_err(RemoteReceiveError::FailedToSelectGateway)?,
526 };
527
528 if !routing_info.receive_fee.le(&PaymentFee::RECEIVE_FEE_LIMIT) {
529 return Err(RemoteReceiveError::PaymentFeeExceedsLimit);
530 }
531
532 let contract_amount = routing_info.receive_fee.subtract_from(amount.msats);
533
534 if contract_amount < Amount::from_sats(50) {
537 return Err(RemoteReceiveError::DustAmount);
538 }
539
540 let expiration = duration_since_epoch()
541 .as_secs()
542 .saturating_add(u64::from(expiry_secs));
543
544 let claim_pk = claimer_pk
545 .mul_tweak(
546 secp256k1::SECP256K1,
547 &Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"),
548 )
549 .expect("Tweak is valid");
550
551 let contract = IncomingContract::new(
552 self.cfg.tpe_agg_pk,
553 encryption_seed,
554 preimage,
555 PaymentImage::Hash(preimage.consensus_hash()),
556 contract_amount,
557 expiration,
558 claim_pk,
559 routing_info.module_public_key,
560 ephemeral_pk,
561 );
562
563 let invoice = self
564 .gateway_conn
565 .bolt11_invoice(
566 gateway,
567 self.federation_id,
568 contract.clone(),
569 amount,
570 description,
571 expiry_secs,
572 )
573 .await
574 .map_err(RemoteReceiveError::GatewayConnectionError)?;
575
576 if invoice.payment_hash() != &preimage.consensus_hash() {
577 return Err(RemoteReceiveError::InvalidInvoicePaymentHash);
578 }
579
580 if invoice.amount_milli_satoshis() != Some(amount.msats) {
581 return Err(RemoteReceiveError::InvalidInvoiceAmount);
582 }
583
584 Ok((invoice, contract))
585 }
586
587 async fn start_remote_receive_state_machine(
590 &self,
591 contract: IncomingContract,
592 claimer_pubkey: PublicKey,
593 ) -> OperationId {
594 let operation_id = OperationId::from_encodable(&contract.clone());
595
596 let receive_sm = LightningClientStateMachines::RemoteReceive(RemoteReceiveStateMachine {
597 common: RemoteReceiveSMCommon {
598 operation_id,
599 claimer_pubkey,
600 contract: contract.clone(),
601 },
602 state: RemoteReceiveSMState::Pending,
603 });
604
605 self.client_ctx
608 .manual_operation_start(
609 operation_id,
610 LightningRemoteCommonInit::KIND.as_str(),
611 OperationMeta { contract },
612 vec![self.client_ctx.make_dyn_state(receive_sm)],
613 )
614 .await
615 .ok();
616
617 operation_id
618 }
619
620 fn recover_contract_keys(
621 &self,
622 contract: &IncomingContract,
623 ) -> Option<(Keypair, AggregateDecryptionKey)> {
624 let ephemeral_tweak = ecdh::SharedSecret::new(
625 &contract.commitment.ephemeral_pk,
626 &self.keypair.secret_key(),
627 )
628 .secret_bytes();
629
630 let encryption_seed = ephemeral_tweak
631 .consensus_hash::<sha256::Hash>()
632 .to_byte_array();
633
634 let claim_keypair = self
635 .keypair
636 .secret_key()
637 .mul_tweak(&Scalar::from_be_bytes(ephemeral_tweak).expect("Within curve order"))
638 .expect("Tweak is valid")
639 .keypair(secp256k1::SECP256K1);
640
641 if claim_keypair.public_key() != contract.commitment.claim_pk {
642 return None; }
644
645 let agg_decryption_key = derive_agg_dk(&self.cfg.tpe_agg_pk, &encryption_seed);
646
647 if !contract.verify_agg_decryption_key(&self.cfg.tpe_agg_pk, &agg_decryption_key) {
648 return None; }
650
651 contract.decrypt_preimage(&agg_decryption_key)?;
652
653 Some((claim_keypair, agg_decryption_key))
654 }
655}
656
657#[derive(Error, Debug, Clone, Eq, PartialEq)]
658pub enum SelectGatewayError {
659 #[error("Federation returned an error: {0}")]
660 FederationError(String),
661 #[error("The federation has no vetted gateways")]
662 NoVettedGateways,
663 #[error("All vetted gateways failed to respond on request of the routing info")]
664 FailedToFetchRoutingInfo,
665}
666
667#[derive(Error, Debug, Clone, Eq, PartialEq)]
668pub enum RemoteReceiveError {
669 #[error("Failed to select gateway: {0}")]
670 FailedToSelectGateway(SelectGatewayError),
671 #[error("Gateway connection error: {0}")]
672 GatewayConnectionError(GatewayConnectionError),
673 #[error("The gateway does not support our federation")]
674 UnknownFederation,
675 #[error("The gateways fee exceeds the limit")]
676 PaymentFeeExceedsLimit,
677 #[error("The total fees required to complete this payment exceed its amount")]
678 DustAmount,
679 #[error("The invoice's payment hash is incorrect")]
680 InvalidInvoicePaymentHash,
681 #[error("The invoice's amount is incorrect")]
682 InvalidInvoiceAmount,
683 #[error("The pubkey of the claimer is not registered")]
684 UnregisteredClaimer,
685}
686
687#[derive(Debug, Clone, Eq, PartialEq, Hash, Decodable, Encodable)]
689pub enum LightningClientStateMachines {
690 RemoteReceive(RemoteReceiveStateMachine),
691}
692
693impl IntoDynInstance for LightningClientStateMachines {
694 type DynType = DynState;
695
696 fn into_dyn(self, instance_id: ModuleInstanceId) -> Self::DynType {
697 DynState::from_typed(instance_id, self)
698 }
699}
700
701impl State for LightningClientStateMachines {
702 type ModuleContext = LightningClientContext;
703
704 fn transitions(
705 &self,
706 context: &Self::ModuleContext,
707 global_context: &DynGlobalClientContext,
708 ) -> Vec<StateTransition<Self>> {
709 match self {
710 LightningClientStateMachines::RemoteReceive(state) => {
711 sm_enum_variant_translation!(
712 state.transitions(context, global_context),
713 LightningClientStateMachines::RemoteReceive
714 )
715 }
716 }
717 }
718
719 fn operation_id(&self) -> OperationId {
720 match self {
721 LightningClientStateMachines::RemoteReceive(state) => state.operation_id(),
722 }
723 }
724}