1use alloc::boxed::Box;
11use alloc::string::ToString;
12use alloc::vec::Vec;
13
14use crate::events::{EventQueue, LiquidityEvent};
15use crate::lsps0::client::LSPS0ClientHandler;
16use crate::lsps0::msgs::LSPS0Message;
17use crate::lsps0::ser::{
18 LSPSMessage, LSPSMethod, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
19 RawLSPSMessage, JSONRPC_INVALID_MESSAGE_ERROR_CODE, JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE,
20 LSPS_MESSAGE_TYPE_ID,
21};
22use crate::lsps0::service::LSPS0ServiceHandler;
23use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
24use crate::lsps5::msgs::LSPS5Message;
25use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
26use crate::message_queue::MessageQueue;
27use crate::persist::{
28 read_event_queue, read_lsps2_service_peer_states, read_lsps5_service_peer_states,
29};
30
31use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
32use crate::lsps1::msgs::LSPS1Message;
33#[cfg(lsps1_service)]
34use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
35
36use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
37use crate::lsps2::msgs::LSPS2Message;
38use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler, LSPS2ServiceHandlerSync};
39use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet};
40use crate::sync::{Arc, Mutex, RwLock};
41use crate::utils::async_poll::dummy_waker;
42#[cfg(feature = "time")]
43use crate::utils::time::DefaultTimeProvider;
44use crate::utils::time::TimeProvider;
45
46use lightning::chain::chaininterface::BroadcasterInterface;
47use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
48use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
49use lightning::ln::msgs::{ErrorAction, LightningError};
50use lightning::ln::peer_handler::CustomMessageHandler;
51use lightning::ln::wire::CustomMessageReader;
52use lightning::sign::{EntropySource, NodeSigner};
53use lightning::util::logger::Level;
54use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper};
55use lightning::util::ser::{LengthLimitedRead, LengthReadable};
56use lightning::util::wakers::{Future, Notifier};
57
58use lightning_types::features::{InitFeatures, NodeFeatures};
59
60use bitcoin::secp256k1::PublicKey;
61
62use core::future::Future as StdFuture;
63use core::ops::Deref;
64use core::task;
65
66const LSPS_FEATURE_BIT: usize = 729;
67
68#[derive(Clone)]
73pub struct LiquidityServiceConfig {
74 #[cfg(lsps1_service)]
76 pub lsps1_service_config: Option<LSPS1ServiceConfig>,
77 pub lsps2_service_config: Option<LSPS2ServiceConfig>,
80 pub lsps5_service_config: Option<LSPS5ServiceConfig>,
82 pub advertise_service: bool,
85}
86
87#[derive(Clone)]
92pub struct LiquidityClientConfig {
93 pub lsps1_client_config: Option<LSPS1ClientConfig>,
95 pub lsps2_client_config: Option<LSPS2ClientConfig>,
97 pub lsps5_client_config: Option<LSPS5ClientConfig>,
99}
100
101pub trait ALiquidityManager {
106 type EntropySource: EntropySource + ?Sized;
108 type ES: Deref<Target = Self::EntropySource> + Clone;
110 type NodeSigner: NodeSigner + ?Sized;
112 type NS: Deref<Target = Self::NodeSigner> + Clone;
114 type AChannelManager: AChannelManager + ?Sized;
116 type CM: Deref<Target = Self::AChannelManager> + Clone;
118 type Filter: Filter + ?Sized;
120 type C: Deref<Target = Self::Filter> + Clone;
122 type KVStore: KVStore + ?Sized;
124 type K: Deref<Target = Self::KVStore> + Clone;
126 type TimeProvider: TimeProvider + ?Sized;
128 type TP: Deref<Target = Self::TimeProvider> + Clone;
130 type BroadcasterInterface: BroadcasterInterface + ?Sized;
132 type T: Deref<Target = Self::BroadcasterInterface> + Clone;
134 fn get_lm(
136 &self,
137 ) -> &LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::K, Self::TP, Self::T>;
138}
139
140impl<
141 ES: Deref + Clone,
142 NS: Deref + Clone,
143 CM: Deref + Clone,
144 C: Deref + Clone,
145 K: Deref + Clone,
146 TP: Deref + Clone,
147 T: Deref + Clone,
148 > ALiquidityManager for LiquidityManager<ES, NS, CM, C, K, TP, T>
149where
150 ES::Target: EntropySource,
151 NS::Target: NodeSigner,
152 CM::Target: AChannelManager,
153 C::Target: Filter,
154 K::Target: KVStore,
155 TP::Target: TimeProvider,
156 T::Target: BroadcasterInterface,
157{
158 type EntropySource = ES::Target;
159 type ES = ES;
160 type NodeSigner = NS::Target;
161 type NS = NS;
162 type AChannelManager = CM::Target;
163 type CM = CM;
164 type Filter = C::Target;
165 type C = C;
166 type KVStore = K::Target;
167 type K = K;
168 type TimeProvider = TP::Target;
169 type TP = TP;
170 type BroadcasterInterface = T::Target;
171 type T = T;
172 fn get_lm(&self) -> &LiquidityManager<ES, NS, CM, C, K, TP, T> {
173 self
174 }
175}
176
177pub trait ALiquidityManagerSync {
182 type EntropySource: EntropySource + ?Sized;
184 type ES: Deref<Target = Self::EntropySource> + Clone;
186 type NodeSigner: NodeSigner + ?Sized;
188 type NS: Deref<Target = Self::NodeSigner> + Clone;
190 type AChannelManager: AChannelManager + ?Sized;
192 type CM: Deref<Target = Self::AChannelManager> + Clone;
194 type Filter: Filter + ?Sized;
196 type C: Deref<Target = Self::Filter> + Clone;
198 type KVStoreSync: KVStoreSync + ?Sized;
200 type KS: Deref<Target = Self::KVStoreSync> + Clone;
202 type TimeProvider: TimeProvider + ?Sized;
204 type TP: Deref<Target = Self::TimeProvider> + Clone;
206 type BroadcasterInterface: BroadcasterInterface + ?Sized;
208 type T: Deref<Target = Self::BroadcasterInterface> + Clone;
210 #[cfg(any(test, feature = "_test_utils"))]
212 fn get_lm_async(
213 &self,
214 ) -> &LiquidityManager<
215 Self::ES,
216 Self::NS,
217 Self::CM,
218 Self::C,
219 KVStoreSyncWrapper<Self::KS>,
220 Self::TP,
221 Self::T,
222 >;
223 fn get_lm(
225 &self,
226 ) -> &LiquidityManagerSync<Self::ES, Self::NS, Self::CM, Self::C, Self::KS, Self::TP, Self::T>;
227}
228
229impl<
230 ES: Deref + Clone,
231 NS: Deref + Clone,
232 CM: Deref + Clone,
233 C: Deref + Clone,
234 KS: Deref + Clone,
235 TP: Deref + Clone,
236 T: Deref + Clone,
237 > ALiquidityManagerSync for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
238where
239 ES::Target: EntropySource,
240 NS::Target: NodeSigner,
241 CM::Target: AChannelManager,
242 C::Target: Filter,
243 KS::Target: KVStoreSync,
244 TP::Target: TimeProvider,
245 T::Target: BroadcasterInterface,
246{
247 type EntropySource = ES::Target;
248 type ES = ES;
249 type NodeSigner = NS::Target;
250 type NS = NS;
251 type AChannelManager = CM::Target;
252 type CM = CM;
253 type Filter = C::Target;
254 type C = C;
255 type KVStoreSync = KS::Target;
256 type KS = KS;
257 type TimeProvider = TP::Target;
258 type TP = TP;
259 type BroadcasterInterface = T::Target;
260 type T = T;
261 #[cfg(any(test, feature = "_test_utils"))]
263 fn get_lm_async(
264 &self,
265 ) -> &LiquidityManager<
266 Self::ES,
267 Self::NS,
268 Self::CM,
269 Self::C,
270 KVStoreSyncWrapper<Self::KS>,
271 Self::TP,
272 Self::T,
273 > {
274 &self.inner
275 }
276 fn get_lm(&self) -> &LiquidityManagerSync<ES, NS, CM, C, KS, TP, T> {
277 self
278 }
279}
280
281pub struct LiquidityManager<
301 ES: Deref + Clone,
302 NS: Deref + Clone,
303 CM: Deref + Clone,
304 C: Deref + Clone,
305 K: Deref + Clone,
306 TP: Deref + Clone,
307 T: Deref + Clone,
308> where
309 ES::Target: EntropySource,
310 NS::Target: NodeSigner,
311 CM::Target: AChannelManager,
312 C::Target: Filter,
313 K::Target: KVStore,
314 TP::Target: TimeProvider,
315 T::Target: BroadcasterInterface,
316{
317 pending_messages: Arc<MessageQueue>,
318 pending_events: Arc<EventQueue<K>>,
319 request_id_to_method_map: Mutex<HashMap<LSPSRequestId, LSPSMethod>>,
320 ignored_peers: RwLock<HashSet<PublicKey>>,
322 lsps0_client_handler: LSPS0ClientHandler<ES, K>,
323 lsps0_service_handler: Option<LSPS0ServiceHandler>,
324 #[cfg(lsps1_service)]
325 lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, C, K>>,
326 lsps1_client_handler: Option<LSPS1ClientHandler<ES, K>>,
327 lsps2_service_handler: Option<LSPS2ServiceHandler<CM, K, T>>,
328 lsps2_client_handler: Option<LSPS2ClientHandler<ES, K>>,
329 lsps5_service_handler: Option<LSPS5ServiceHandler<CM, NS, K, TP>>,
330 lsps5_client_handler: Option<LSPS5ClientHandler<ES, K>>,
331 service_config: Option<LiquidityServiceConfig>,
332 _client_config: Option<LiquidityClientConfig>,
333 best_block: RwLock<Option<BestBlock>>,
334 _chain_source: Option<C>,
335 pending_msgs_or_needs_persist_notifier: Arc<Notifier>,
336}
337
338#[cfg(feature = "time")]
339impl<
340 ES: Deref + Clone,
341 NS: Deref + Clone,
342 CM: Deref + Clone,
343 C: Deref + Clone,
344 K: Deref + Clone,
345 T: Deref + Clone,
346 > LiquidityManager<ES, NS, CM, C, K, DefaultTimeProvider, T>
347where
348 ES::Target: EntropySource,
349 NS::Target: NodeSigner,
350 CM::Target: AChannelManager,
351 C::Target: Filter,
352 K::Target: KVStore,
353 T::Target: BroadcasterInterface,
354{
355 pub async fn new(
359 entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
360 chain_params: Option<ChainParameters>, kv_store: K, transaction_broadcaster: T,
361 service_config: Option<LiquidityServiceConfig>,
362 client_config: Option<LiquidityClientConfig>,
363 ) -> Result<Self, lightning::io::Error> {
364 Self::new_with_custom_time_provider(
365 entropy_source,
366 node_signer,
367 channel_manager,
368 transaction_broadcaster,
369 chain_source,
370 chain_params,
371 kv_store,
372 service_config,
373 client_config,
374 DefaultTimeProvider,
375 )
376 .await
377 }
378}
379
380impl<
381 ES: Deref + Clone,
382 NS: Deref + Clone,
383 CM: Deref + Clone,
384 C: Deref + Clone,
385 K: Deref + Clone,
386 TP: Deref + Clone,
387 T: Deref + Clone,
388 > LiquidityManager<ES, NS, CM, C, K, TP, T>
389where
390 ES::Target: EntropySource,
391 NS::Target: NodeSigner,
392 CM::Target: AChannelManager,
393 C::Target: Filter,
394 K::Target: KVStore,
395 TP::Target: TimeProvider,
396 T::Target: BroadcasterInterface,
397{
398 pub async fn new_with_custom_time_provider(
407 entropy_source: ES, node_signer: NS, channel_manager: CM, transaction_broadcaster: T,
408 chain_source: Option<C>, chain_params: Option<ChainParameters>, kv_store: K,
409 service_config: Option<LiquidityServiceConfig>,
410 client_config: Option<LiquidityClientConfig>, time_provider: TP,
411 ) -> Result<Self, lightning::io::Error> {
412 let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new());
413 let pending_messages =
414 Arc::new(MessageQueue::new(Arc::clone(&pending_msgs_or_needs_persist_notifier)));
415 let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
416 let pending_events = Arc::new(EventQueue::new(
417 persisted_queue,
418 kv_store.clone(),
419 Arc::clone(&pending_msgs_or_needs_persist_notifier),
420 ));
421 let ignored_peers = RwLock::new(new_hash_set());
422
423 let mut supported_protocols = Vec::new();
424
425 let lsps2_client_handler = client_config.as_ref().and_then(|config| {
426 config.lsps2_client_config.map(|config| {
427 LSPS2ClientHandler::new(
428 entropy_source.clone(),
429 Arc::clone(&pending_messages),
430 Arc::clone(&pending_events),
431 config.clone(),
432 )
433 })
434 });
435
436 let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() {
437 if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() {
438 if let Some(number) =
439 <LSPS2ServiceHandler<CM, K, T> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
440 {
441 supported_protocols.push(number);
442 }
443
444 let peer_states = read_lsps2_service_peer_states(kv_store.clone()).await?;
445 Some(LSPS2ServiceHandler::new(
446 peer_states,
447 Arc::clone(&pending_messages),
448 Arc::clone(&pending_events),
449 channel_manager.clone(),
450 kv_store.clone(),
451 transaction_broadcaster.clone(),
452 lsps2_service_config.clone(),
453 )?)
454 } else {
455 None
456 }
457 } else {
458 None
459 };
460
461 let lsps5_client_handler = client_config.as_ref().and_then(|config| {
462 config.lsps5_client_config.as_ref().map(|config| {
463 LSPS5ClientHandler::new(
464 entropy_source.clone(),
465 Arc::clone(&pending_messages),
466 Arc::clone(&pending_events),
467 config.clone(),
468 )
469 })
470 });
471
472 let lsps5_service_handler = if let Some(service_config) = service_config.as_ref() {
473 if let Some(lsps5_service_config) = service_config.lsps5_service_config.as_ref() {
474 if let Some(number) =
475 <LSPS5ServiceHandler<CM, NS, K, TP> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
476 {
477 supported_protocols.push(number);
478 }
479
480 let peer_states = read_lsps5_service_peer_states(kv_store.clone()).await?;
481 Some(LSPS5ServiceHandler::new_with_time_provider(
482 peer_states,
483 Arc::clone(&pending_events),
484 Arc::clone(&pending_messages),
485 channel_manager.clone(),
486 kv_store.clone(),
487 node_signer,
488 lsps5_service_config.clone(),
489 time_provider,
490 ))
491 } else {
492 None
493 }
494 } else {
495 None
496 };
497
498 let lsps1_client_handler = client_config.as_ref().and_then(|config| {
499 config.lsps1_client_config.as_ref().map(|config| {
500 LSPS1ClientHandler::new(
501 entropy_source.clone(),
502 Arc::clone(&pending_messages),
503 Arc::clone(&pending_events),
504 config.clone(),
505 )
506 })
507 });
508
509 #[cfg(lsps1_service)]
510 let lsps1_service_handler = service_config.as_ref().and_then(|config| {
511 if let Some(number) =
512 <LSPS1ServiceHandler<ES, CM, C, K> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
513 {
514 supported_protocols.push(number);
515 }
516 config.lsps1_service_config.as_ref().map(|config| {
517 LSPS1ServiceHandler::new(
518 entropy_source.clone(),
519 Arc::clone(&pending_messages),
520 Arc::clone(&pending_events),
521 channel_manager.clone(),
522 chain_source.clone(),
523 config.clone(),
524 )
525 })
526 });
527
528 let lsps0_client_handler = LSPS0ClientHandler::new(
529 entropy_source.clone(),
530 Arc::clone(&pending_messages),
531 Arc::clone(&pending_events),
532 );
533
534 let lsps0_service_handler = if service_config.is_some() {
535 Some(LSPS0ServiceHandler::new(supported_protocols, Arc::clone(&pending_messages)))
536 } else {
537 None
538 };
539
540 Ok(Self {
541 pending_messages,
542 pending_events,
543 request_id_to_method_map: Mutex::new(new_hash_map()),
544 ignored_peers,
545 lsps0_client_handler,
546 lsps0_service_handler,
547 lsps1_client_handler,
548 #[cfg(lsps1_service)]
549 lsps1_service_handler,
550 lsps2_client_handler,
551 lsps2_service_handler,
552 lsps5_client_handler,
553 lsps5_service_handler,
554 service_config,
555 _client_config: client_config,
556 best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
557 _chain_source: chain_source,
558 pending_msgs_or_needs_persist_notifier,
559 })
560 }
561
562 pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, K> {
564 &self.lsps0_client_handler
565 }
566
567 pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
569 self.lsps0_service_handler.as_ref()
570 }
571
572 pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES, K>> {
577 self.lsps1_client_handler.as_ref()
578 }
579
580 #[cfg(lsps1_service)]
582 pub fn lsps1_service_handler(&self) -> Option<&LSPS1ServiceHandler<ES, CM, C, K>> {
583 self.lsps1_service_handler.as_ref()
584 }
585
586 pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES, K>> {
592 self.lsps2_client_handler.as_ref()
593 }
594
595 pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM, K, T>> {
599 self.lsps2_service_handler.as_ref()
600 }
601
602 pub fn lsps5_client_handler(&self) -> Option<&LSPS5ClientHandler<ES, K>> {
606 self.lsps5_client_handler.as_ref()
607 }
608
609 pub fn lsps5_service_handler(&self) -> Option<&LSPS5ServiceHandler<CM, NS, K, TP>> {
613 self.lsps5_service_handler.as_ref()
614 }
615
616 pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
622 self.pending_msgs_or_needs_persist_notifier.get_future()
623 }
624
625 #[cfg(feature = "std")]
630 pub(crate) fn wait_next_event(&self) -> LiquidityEvent {
631 self.pending_events.wait_next_event()
632 }
633
634 pub fn next_event(&self) -> Option<LiquidityEvent> {
644 self.pending_events.next_event()
645 }
646
647 pub async fn next_event_async(&self) -> LiquidityEvent {
657 self.pending_events.next_event_async().await
658 }
659
660 pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
670 self.pending_events.get_and_clear_pending_events()
671 }
672
673 pub async fn persist(&self) -> Result<(), lightning::io::Error> {
678 self.pending_events.persist().await?;
680
681 if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
682 lsps2_service_handler.persist().await?;
683 }
684
685 if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
686 lsps5_service_handler.persist().await?;
687 }
688
689 Ok(())
690 }
691
692 fn handle_lsps_message(
693 &self, msg: LSPSMessage, sender_node_id: &PublicKey,
694 ) -> Result<(), lightning::ln::msgs::LightningError> {
695 match msg {
696 LSPSMessage::Invalid(_error) => {
697 return Err(LightningError { err: format!("{} did not understand a message we previously sent, maybe they don't support a protocol we are trying to use?", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Error)});
698 },
699 LSPSMessage::LSPS0(msg @ LSPS0Message::Response(..)) => {
700 self.lsps0_client_handler.handle_message(msg, sender_node_id)?;
701 },
702 LSPSMessage::LSPS0(msg @ LSPS0Message::Request(..)) => {
703 match &self.lsps0_service_handler {
704 Some(lsps0_service_handler) => {
705 lsps0_service_handler.handle_message(msg, sender_node_id)?;
706 },
707 None => {
708 return Err(LightningError { err: format!("Received LSPS0 request message without LSPS0 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
709 },
710 }
711 },
712 LSPSMessage::LSPS1(msg @ LSPS1Message::Response(..)) => {
713 match &self.lsps1_client_handler {
714 Some(lsps1_client_handler) => {
715 lsps1_client_handler.handle_message(msg, sender_node_id)?;
716 },
717 None => {
718 return Err(LightningError { err: format!("Received LSPS1 response message without LSPS1 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
719 },
720 }
721 },
722 LSPSMessage::LSPS1(_msg @ LSPS1Message::Request(..)) => {
723 #[cfg(lsps1_service)]
724 match &self.lsps1_service_handler {
725 Some(lsps1_service_handler) => {
726 lsps1_service_handler.handle_message(_msg, sender_node_id)?;
727 },
728 None => {
729 return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
730 },
731 }
732 #[cfg(not(lsps1_service))]
733 return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
734 },
735 LSPSMessage::LSPS2(msg @ LSPS2Message::Response(..)) => {
736 match &self.lsps2_client_handler {
737 Some(lsps2_client_handler) => {
738 lsps2_client_handler.handle_message(msg, sender_node_id)?;
739 },
740 None => {
741 return Err(LightningError { err: format!("Received LSPS2 response message without LSPS2 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
742 },
743 }
744 },
745 LSPSMessage::LSPS2(msg @ LSPS2Message::Request(..)) => {
746 match &self.lsps2_service_handler {
747 Some(lsps2_service_handler) => {
748 lsps2_service_handler.handle_message(msg, sender_node_id)?;
749 },
750 None => {
751 return Err(LightningError { err: format!("Received LSPS2 request message without LSPS2 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
752 },
753 }
754 },
755 LSPSMessage::LSPS5(msg @ LSPS5Message::Response(..)) => {
756 match &self.lsps5_client_handler {
757 Some(lsps5_client_handler) => {
758 lsps5_client_handler.handle_message(msg, sender_node_id)?;
759 },
760 None => {
761 return Err(LightningError { err: format!("Received LSPS5 response message without LSPS5 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
762 },
763 }
764 },
765 LSPSMessage::LSPS5(msg @ LSPS5Message::Request(..)) => {
766 match &self.lsps5_service_handler {
767 Some(lsps5_service_handler) => {
768 if let LSPS5Message::Request(ref req_id, ref req) = msg {
769 if req.is_state_allocating() {
770 let lsps2_has_active_requests = self
771 .lsps2_service_handler
772 .as_ref()
773 .map_or(false, |h| h.has_active_requests(sender_node_id));
774 #[cfg(lsps1_service)]
775 let lsps1_has_active_requests = self
776 .lsps1_service_handler
777 .as_ref()
778 .map_or(false, |h| h.has_active_requests(sender_node_id));
779 #[cfg(not(lsps1_service))]
780 let lsps1_has_active_requests = false;
781
782 lsps5_service_handler.enforce_prior_activity_or_reject(
783 sender_node_id,
784 lsps2_has_active_requests,
785 lsps1_has_active_requests,
786 req_id.clone(),
787 )?
788 }
789 }
790
791 lsps5_service_handler.handle_message(msg, sender_node_id)?;
792 },
793 None => {
794 return Err(LightningError { err: format!("Received LSPS5 request message without LSPS5 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
795 },
796 }
797 },
798 }
799 Ok(())
800 }
801}
802
803impl<
804 ES: Deref + Clone,
805 NS: Deref + Clone,
806 CM: Deref + Clone,
807 C: Deref + Clone,
808 K: Deref + Clone,
809 TP: Deref + Clone,
810 T: Deref + Clone,
811 > CustomMessageReader for LiquidityManager<ES, NS, CM, C, K, TP, T>
812where
813 ES::Target: EntropySource,
814 NS::Target: NodeSigner,
815 CM::Target: AChannelManager,
816 C::Target: Filter,
817 K::Target: KVStore,
818 TP::Target: TimeProvider,
819 T::Target: BroadcasterInterface,
820{
821 type CustomMessage = RawLSPSMessage;
822
823 fn read<RD: LengthLimitedRead>(
824 &self, message_type: u16, buffer: &mut RD,
825 ) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
826 match message_type {
827 LSPS_MESSAGE_TYPE_ID => {
828 Ok(Some(RawLSPSMessage::read_from_fixed_length_buffer(buffer)?))
829 },
830 _ => Ok(None),
831 }
832 }
833}
834
835impl<
836 ES: Deref + Clone,
837 NS: Deref + Clone,
838 CM: Deref + Clone,
839 C: Deref + Clone,
840 K: Deref + Clone,
841 TP: Deref + Clone,
842 T: Deref + Clone,
843 > CustomMessageHandler for LiquidityManager<ES, NS, CM, C, K, TP, T>
844where
845 ES::Target: EntropySource,
846 NS::Target: NodeSigner,
847 CM::Target: AChannelManager,
848 C::Target: Filter,
849 K::Target: KVStore,
850 TP::Target: TimeProvider,
851 T::Target: BroadcasterInterface,
852{
853 fn handle_custom_message(
854 &self, msg: Self::CustomMessage, sender_node_id: PublicKey,
855 ) -> Result<(), lightning::ln::msgs::LightningError> {
856 {
857 if self.ignored_peers.read().unwrap().contains(&sender_node_id) {
858 let err = format!("Ignoring message from peer {}.", sender_node_id);
859 return Err(LightningError {
860 err,
861 action: ErrorAction::IgnoreAndLog(Level::Trace),
862 });
863 }
864 }
865
866 let message = {
867 {
868 let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
869 LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
870 }
871 .map_err(|_| {
872 let mut message_queue_notifier = self.pending_messages.notifier();
873
874 let error = LSPSResponseError {
875 code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
876 message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
877 data: None,
878 };
879
880 message_queue_notifier.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
881 self.ignored_peers.write().unwrap().insert(sender_node_id);
882 let err = format!(
883 "Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
884 sender_node_id
885 );
886 LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) }
887 })?
888 };
889
890 self.handle_lsps_message(message, &sender_node_id)
891 }
892
893 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
894 let pending_messages = self.pending_messages.get_and_clear_pending_msgs();
895
896 let mut request_ids_and_methods = pending_messages
897 .iter()
898 .filter_map(|(_, msg)| msg.get_request_id_and_method())
899 .peekable();
900
901 if request_ids_and_methods.peek().is_some() {
902 let mut request_id_to_method_map_lock = self.request_id_to_method_map.lock().unwrap();
903 for (request_id, method) in request_ids_and_methods {
904 request_id_to_method_map_lock.insert(request_id, method);
905 }
906 }
907
908 pending_messages
909 .into_iter()
910 .filter_map(|(public_key, msg)| {
911 serde_json::to_string(&msg)
912 .ok()
913 .map(|payload| (public_key, RawLSPSMessage { payload }))
914 })
915 .collect()
916 }
917
918 fn provided_node_features(&self) -> NodeFeatures {
919 let mut features = NodeFeatures::empty();
920
921 let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
922
923 if advertise_service {
924 features
925 .set_optional_custom_bit(LSPS_FEATURE_BIT)
926 .expect("Failed to set LSPS feature bit");
927 }
928
929 features
930 }
931
932 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
933 let mut features = InitFeatures::empty();
934
935 let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
936 if advertise_service {
937 features
938 .set_optional_custom_bit(LSPS_FEATURE_BIT)
939 .expect("Failed to set LSPS feature bit");
940 }
941
942 features
943 }
944
945 fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
946 self.ignored_peers.write().unwrap().remove(&counterparty_node_id);
948
949 if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
950 lsps2_service_handler.peer_disconnected(counterparty_node_id);
951 }
952
953 if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
954 lsps5_service_handler.peer_disconnected(&counterparty_node_id);
955 }
956 }
957 fn peer_connected(
958 &self, counterparty_node_id: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init,
959 _: bool,
960 ) -> Result<(), ()> {
961 if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
962 lsps5_service_handler.peer_connected(&counterparty_node_id);
963 }
964
965 Ok(())
966 }
967}
968
969impl<
970 ES: Deref + Clone,
971 NS: Deref + Clone,
972 CM: Deref + Clone,
973 C: Deref + Clone,
974 K: Deref + Clone,
975 TP: Deref + Clone,
976 T: Deref + Clone,
977 > Listen for LiquidityManager<ES, NS, CM, C, K, TP, T>
978where
979 ES::Target: EntropySource,
980 NS::Target: NodeSigner,
981 CM::Target: AChannelManager,
982 C::Target: Filter,
983 K::Target: KVStore,
984 TP::Target: TimeProvider,
985 T::Target: BroadcasterInterface,
986{
987 fn filtered_block_connected(
988 &self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
989 height: u32,
990 ) {
991 if let Some(best_block) = self.best_block.read().unwrap().as_ref() {
992 assert_eq!(best_block.block_hash, header.prev_blockhash,
993 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
994 assert_eq!(best_block.height, height - 1,
995 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
996 }
997
998 self.transactions_confirmed(header, txdata, height);
999 self.best_block_updated(header, height);
1000 }
1001
1002 fn blocks_disconnected(&self, fork_point: BestBlock) {
1003 if let Some(best_block) = self.best_block.write().unwrap().as_mut() {
1004 assert!(best_block.height > fork_point.height,
1005 "Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
1006 *best_block = fork_point;
1007 }
1008
1009 }
1013}
1014
1015impl<
1016 ES: Deref + Clone,
1017 NS: Deref + Clone,
1018 CM: Deref + Clone,
1019 C: Deref + Clone,
1020 K: Deref + Clone,
1021 TP: Deref + Clone,
1022 T: Deref + Clone,
1023 > Confirm for LiquidityManager<ES, NS, CM, C, K, TP, T>
1024where
1025 ES::Target: EntropySource,
1026 NS::Target: NodeSigner,
1027 CM::Target: AChannelManager,
1028 C::Target: Filter,
1029 K::Target: KVStore,
1030 TP::Target: TimeProvider,
1031 T::Target: BroadcasterInterface,
1032{
1033 fn transactions_confirmed(
1034 &self, _header: &bitcoin::block::Header, _txdata: &chain::transaction::TransactionData,
1035 _height: u32,
1036 ) {
1037 }
1039
1040 fn transaction_unconfirmed(&self, _txid: &bitcoin::Txid) {
1041 }
1045
1046 fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
1047 let new_best_block = BestBlock::new(header.block_hash(), height);
1048 *self.best_block.write().unwrap() = Some(new_best_block);
1049
1050 }
1052
1053 fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
1054 Vec::new()
1056 }
1057}
1058
1059pub struct LiquidityManagerSync<
1062 ES: Deref + Clone,
1063 NS: Deref + Clone,
1064 CM: Deref + Clone,
1065 C: Deref + Clone,
1066 KS: Deref + Clone,
1067 TP: Deref + Clone,
1068 T: Deref + Clone,
1069> where
1070 ES::Target: EntropySource,
1071 NS::Target: NodeSigner,
1072 CM::Target: AChannelManager,
1073 C::Target: Filter,
1074 KS::Target: KVStoreSync,
1075 TP::Target: TimeProvider,
1076 T::Target: BroadcasterInterface,
1077{
1078 inner: LiquidityManager<ES, NS, CM, C, KVStoreSyncWrapper<KS>, TP, T>,
1079}
1080
1081#[cfg(feature = "time")]
1082impl<
1083 ES: Deref + Clone,
1084 NS: Deref + Clone,
1085 CM: Deref + Clone,
1086 C: Deref + Clone,
1087 KS: Deref + Clone,
1088 T: Deref + Clone,
1089 > LiquidityManagerSync<ES, NS, CM, C, KS, DefaultTimeProvider, T>
1090where
1091 ES::Target: EntropySource,
1092 NS::Target: NodeSigner,
1093 CM::Target: AChannelManager,
1094 KS::Target: KVStoreSync,
1095 C::Target: Filter,
1096 T::Target: BroadcasterInterface,
1097{
1098 pub fn new(
1102 entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
1103 chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
1104 service_config: Option<LiquidityServiceConfig>,
1105 client_config: Option<LiquidityClientConfig>,
1106 ) -> Result<Self, lightning::io::Error> {
1107 let kv_store = KVStoreSyncWrapper(kv_store_sync);
1108
1109 let mut fut = Box::pin(LiquidityManager::new(
1110 entropy_source,
1111 node_signer,
1112 channel_manager,
1113 chain_source,
1114 chain_params,
1115 kv_store,
1116 transaction_broadcaster,
1117 service_config,
1118 client_config,
1119 ));
1120
1121 let mut waker = dummy_waker();
1122 let mut ctx = task::Context::from_waker(&mut waker);
1123 let inner = match fut.as_mut().poll(&mut ctx) {
1124 task::Poll::Ready(result) => result,
1125 task::Poll::Pending => {
1126 unreachable!("LiquidityManager::new should not be pending in a sync context");
1128 },
1129 }?;
1130 Ok(Self { inner })
1131 }
1132}
1133
1134impl<
1135 ES: Deref + Clone,
1136 NS: Deref + Clone,
1137 CM: Deref + Clone,
1138 C: Deref + Clone,
1139 KS: Deref + Clone,
1140 TP: Deref + Clone,
1141 T: Deref + Clone,
1142 > LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1143where
1144 ES::Target: EntropySource,
1145 NS::Target: NodeSigner,
1146 CM::Target: AChannelManager,
1147 C::Target: Filter,
1148 KS::Target: KVStoreSync,
1149 TP::Target: TimeProvider,
1150 T::Target: BroadcasterInterface,
1151{
1152 pub fn new_with_custom_time_provider(
1156 entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
1157 chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
1158 service_config: Option<LiquidityServiceConfig>,
1159 client_config: Option<LiquidityClientConfig>, time_provider: TP,
1160 ) -> Result<Self, lightning::io::Error> {
1161 let kv_store = KVStoreSyncWrapper(kv_store_sync);
1162 let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider(
1163 entropy_source,
1164 node_signer,
1165 channel_manager,
1166 transaction_broadcaster,
1167 chain_source,
1168 chain_params,
1169 kv_store,
1170 service_config,
1171 client_config,
1172 time_provider,
1173 ));
1174
1175 let mut waker = dummy_waker();
1176 let mut ctx = task::Context::from_waker(&mut waker);
1177 let inner = match fut.as_mut().poll(&mut ctx) {
1178 task::Poll::Ready(result) => result,
1179 task::Poll::Pending => {
1180 unreachable!("LiquidityManager::new should not be pending in a sync context");
1182 },
1183 }?;
1184 Ok(Self { inner })
1185 }
1186
1187 pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, KVStoreSyncWrapper<KS>> {
1191 self.inner.lsps0_client_handler()
1192 }
1193
1194 pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
1198 self.inner.lsps0_service_handler()
1199 }
1200
1201 pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1205 self.inner.lsps1_client_handler()
1206 }
1207
1208 #[cfg(lsps1_service)]
1212 pub fn lsps1_service_handler(
1213 &self,
1214 ) -> Option<&LSPS1ServiceHandler<ES, CM, C, KVStoreSyncWrapper<KS>>> {
1215 self.inner.lsps1_service_handler()
1216 }
1217
1218 pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1222 self.inner.lsps2_client_handler()
1223 }
1224
1225 pub fn lsps2_service_handler<'a>(
1229 &'a self,
1230 ) -> Option<LSPS2ServiceHandlerSync<'a, CM, KVStoreSyncWrapper<KS>, T>> {
1231 self.inner.lsps2_service_handler.as_ref().map(|r| LSPS2ServiceHandlerSync::from_inner(r))
1232 }
1233
1234 pub fn lsps5_client_handler(&self) -> Option<&LSPS5ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1238 self.inner.lsps5_client_handler()
1239 }
1240
1241 pub fn lsps5_service_handler(
1245 &self,
1246 ) -> Option<&LSPS5ServiceHandler<CM, NS, KVStoreSyncWrapper<KS>, TP>> {
1247 self.inner.lsps5_service_handler()
1248 }
1249
1250 pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
1255 self.inner.get_pending_msgs_or_needs_persist_future()
1256 }
1257
1258 #[cfg(feature = "std")]
1268 pub fn wait_next_event(&self) -> LiquidityEvent {
1269 self.inner.wait_next_event()
1270 }
1271
1272 pub fn next_event(&self) -> Option<LiquidityEvent> {
1276 self.inner.next_event()
1277 }
1278
1279 pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
1283 self.inner.get_and_clear_pending_events()
1284 }
1285
1286 pub fn persist(&self) -> Result<(), lightning::io::Error> {
1290 let mut waker = dummy_waker();
1291 let mut ctx = task::Context::from_waker(&mut waker);
1292 match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {
1293 task::Poll::Ready(result) => result,
1294 task::Poll::Pending => {
1295 unreachable!("LiquidityManager::persist should not be pending in a sync context");
1297 },
1298 }
1299 }
1300}
1301
1302impl<
1303 ES: Deref + Clone,
1304 NS: Deref + Clone,
1305 CM: Deref + Clone,
1306 C: Deref + Clone,
1307 KS: Deref + Clone,
1308 TP: Deref + Clone,
1309 T: Deref + Clone,
1310 > CustomMessageReader for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1311where
1312 ES::Target: EntropySource,
1313 NS::Target: NodeSigner,
1314 CM::Target: AChannelManager,
1315 C::Target: Filter,
1316 KS::Target: KVStoreSync,
1317 TP::Target: TimeProvider,
1318 T::Target: BroadcasterInterface,
1319{
1320 type CustomMessage = RawLSPSMessage;
1321
1322 fn read<RD: LengthLimitedRead>(
1323 &self, message_type: u16, buffer: &mut RD,
1324 ) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
1325 self.inner.read(message_type, buffer)
1326 }
1327}
1328
1329impl<
1330 ES: Deref + Clone,
1331 NS: Deref + Clone,
1332 CM: Deref + Clone,
1333 C: Deref + Clone,
1334 KS: Deref + Clone,
1335 TP: Deref + Clone,
1336 T: Deref + Clone,
1337 > CustomMessageHandler for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1338where
1339 ES::Target: EntropySource,
1340 NS::Target: NodeSigner,
1341 CM::Target: AChannelManager,
1342 C::Target: Filter,
1343 KS::Target: KVStoreSync,
1344 TP::Target: TimeProvider,
1345 T::Target: BroadcasterInterface,
1346{
1347 fn handle_custom_message(
1348 &self, msg: Self::CustomMessage, sender_node_id: PublicKey,
1349 ) -> Result<(), lightning::ln::msgs::LightningError> {
1350 self.inner.handle_custom_message(msg, sender_node_id)
1351 }
1352
1353 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
1354 self.inner.get_and_clear_pending_msg()
1355 }
1356
1357 fn provided_node_features(&self) -> NodeFeatures {
1358 self.inner.provided_node_features()
1359 }
1360
1361 fn provided_init_features(&self, their_node_id: PublicKey) -> InitFeatures {
1362 self.inner.provided_init_features(their_node_id)
1363 }
1364
1365 fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
1366 self.inner.peer_disconnected(counterparty_node_id)
1367 }
1368 fn peer_connected(
1369 &self, counterparty_node_id: bitcoin::secp256k1::PublicKey,
1370 init_msg: &lightning::ln::msgs::Init, inbound: bool,
1371 ) -> Result<(), ()> {
1372 self.inner.peer_connected(counterparty_node_id, init_msg, inbound)
1373 }
1374}
1375
1376impl<
1377 ES: Deref + Clone,
1378 NS: Deref + Clone,
1379 CM: Deref + Clone,
1380 C: Deref + Clone,
1381 KS: Deref + Clone,
1382 TP: Deref + Clone,
1383 T: Deref + Clone,
1384 > Listen for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1385where
1386 ES::Target: EntropySource,
1387 NS::Target: NodeSigner,
1388 CM::Target: AChannelManager,
1389 C::Target: Filter,
1390 KS::Target: KVStoreSync,
1391 TP::Target: TimeProvider,
1392 T::Target: BroadcasterInterface,
1393{
1394 fn filtered_block_connected(
1395 &self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
1396 height: u32,
1397 ) {
1398 self.inner.filtered_block_connected(header, txdata, height)
1399 }
1400
1401 fn blocks_disconnected(&self, fork_point: BestBlock) {
1402 self.inner.blocks_disconnected(fork_point);
1403 }
1404}
1405
1406impl<
1407 ES: Deref + Clone,
1408 NS: Deref + Clone,
1409 CM: Deref + Clone,
1410 C: Deref + Clone,
1411 KS: Deref + Clone,
1412 TP: Deref + Clone,
1413 T: Deref + Clone,
1414 > Confirm for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1415where
1416 ES::Target: EntropySource,
1417 NS::Target: NodeSigner,
1418 CM::Target: AChannelManager,
1419 C::Target: Filter,
1420 KS::Target: KVStoreSync,
1421 TP::Target: TimeProvider,
1422 T::Target: BroadcasterInterface,
1423{
1424 fn transactions_confirmed(
1425 &self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
1426 height: u32,
1427 ) {
1428 self.inner.transactions_confirmed(header, txdata, height)
1429 }
1430
1431 fn transaction_unconfirmed(&self, txid: &bitcoin::Txid) {
1432 self.inner.transaction_unconfirmed(txid)
1433 }
1434
1435 fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
1436 self.inner.best_block_updated(header, height)
1437 }
1438
1439 fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
1440 self.inner.get_relevant_txids()
1441 }
1442}