lightning_liquidity/
manager.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10use alloc::boxed::Box;
11use alloc::string::ToString;
12use alloc::vec::Vec;
13
14use crate::events::{EventQueue, LiquidityEvent};
15use crate::lsps0::client::LSPS0ClientHandler;
16use crate::lsps0::msgs::LSPS0Message;
17use crate::lsps0::ser::{
18	LSPSMessage, LSPSMethod, LSPSProtocolMessageHandler, LSPSRequestId, LSPSResponseError,
19	RawLSPSMessage, JSONRPC_INVALID_MESSAGE_ERROR_CODE, JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE,
20	LSPS_MESSAGE_TYPE_ID,
21};
22use crate::lsps0::service::LSPS0ServiceHandler;
23use crate::lsps5::client::{LSPS5ClientConfig, LSPS5ClientHandler};
24use crate::lsps5::msgs::LSPS5Message;
25use crate::lsps5::service::{LSPS5ServiceConfig, LSPS5ServiceHandler};
26use crate::message_queue::MessageQueue;
27use crate::persist::{
28	read_event_queue, read_lsps2_service_peer_states, read_lsps5_service_peer_states,
29};
30
31use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
32use crate::lsps1::msgs::LSPS1Message;
33#[cfg(lsps1_service)]
34use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
35
36use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
37use crate::lsps2::msgs::LSPS2Message;
38use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler, LSPS2ServiceHandlerSync};
39use crate::prelude::{new_hash_map, new_hash_set, HashMap, HashSet};
40use crate::sync::{Arc, Mutex, RwLock};
41use crate::utils::async_poll::dummy_waker;
42#[cfg(feature = "time")]
43use crate::utils::time::DefaultTimeProvider;
44use crate::utils::time::TimeProvider;
45
46use lightning::chain::chaininterface::BroadcasterInterface;
47use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
48use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
49use lightning::ln::msgs::{ErrorAction, LightningError};
50use lightning::ln::peer_handler::CustomMessageHandler;
51use lightning::ln::wire::CustomMessageReader;
52use lightning::sign::{EntropySource, NodeSigner};
53use lightning::util::logger::Level;
54use lightning::util::persist::{KVStore, KVStoreSync, KVStoreSyncWrapper};
55use lightning::util::ser::{LengthLimitedRead, LengthReadable};
56use lightning::util::wakers::{Future, Notifier};
57
58use lightning_types::features::{InitFeatures, NodeFeatures};
59
60use bitcoin::secp256k1::PublicKey;
61
62use core::future::Future as StdFuture;
63use core::ops::Deref;
64use core::task;
65
66const LSPS_FEATURE_BIT: usize = 729;
67
68/// A server-side configuration for [`LiquidityManager`].
69///
70/// Allows end-users to configure options when using the [`LiquidityManager`]
71/// to provide liquidity services to clients.
72#[derive(Clone)]
73pub struct LiquidityServiceConfig {
74	/// Optional server-side configuration for LSPS1 channel requests.
75	#[cfg(lsps1_service)]
76	pub lsps1_service_config: Option<LSPS1ServiceConfig>,
77	/// Optional server-side configuration for JIT channels
78	/// should you want to support them.
79	pub lsps2_service_config: Option<LSPS2ServiceConfig>,
80	/// Optional server-side configuration for LSPS5 webhook service.
81	pub lsps5_service_config: Option<LSPS5ServiceConfig>,
82	/// Controls whether the liquidity service should be advertised via setting the feature bit in
83	/// node announcment and the init message.
84	pub advertise_service: bool,
85}
86
87/// A client-side configuration for [`LiquidityManager`].
88///
89/// Allows end-user to configure options when using the [`LiquidityManager`]
90/// to access liquidity services from a provider.
91#[derive(Clone)]
92pub struct LiquidityClientConfig {
93	/// Optional client-side configuration for LSPS1 channel requests.
94	pub lsps1_client_config: Option<LSPS1ClientConfig>,
95	/// Optional client-side configuration for JIT channels.
96	pub lsps2_client_config: Option<LSPS2ClientConfig>,
97	/// Optional client-side configuration for LSPS5 webhook service.
98	pub lsps5_client_config: Option<LSPS5ClientConfig>,
99}
100
101/// A trivial trait which describes any [`LiquidityManager`].
102///
103/// This is not exported to bindings users as general cover traits aren't useful in other
104/// languages.
105pub trait ALiquidityManager {
106	/// A type implementing [`EntropySource`]
107	type EntropySource: EntropySource + ?Sized;
108	/// A type that may be dereferenced to [`Self::EntropySource`].
109	type ES: Deref<Target = Self::EntropySource> + Clone;
110	/// A type implementing [`NodeSigner`]
111	type NodeSigner: NodeSigner + ?Sized;
112	/// A type that may be dereferenced to [`Self::NodeSigner`].
113	type NS: Deref<Target = Self::NodeSigner> + Clone;
114	/// A type implementing [`AChannelManager`]
115	type AChannelManager: AChannelManager + ?Sized;
116	/// A type that may be dereferenced to [`Self::AChannelManager`].
117	type CM: Deref<Target = Self::AChannelManager> + Clone;
118	/// A type implementing [`Filter`].
119	type Filter: Filter + ?Sized;
120	/// A type that may be dereferenced to [`Self::Filter`].
121	type C: Deref<Target = Self::Filter> + Clone;
122	/// A type implementing [`KVStore`].
123	type KVStore: KVStore + ?Sized;
124	/// A type that may be dereferenced to [`Self::KVStore`].
125	type K: Deref<Target = Self::KVStore> + Clone;
126	/// A type implementing [`TimeProvider`].
127	type TimeProvider: TimeProvider + ?Sized;
128	/// A type that may be dereferenced to [`Self::TimeProvider`].
129	type TP: Deref<Target = Self::TimeProvider> + Clone;
130	/// A type implementing [`BroadcasterInterface`].
131	type BroadcasterInterface: BroadcasterInterface + ?Sized;
132	/// A type that may be dereferenced to [`Self::BroadcasterInterface`].
133	type T: Deref<Target = Self::BroadcasterInterface> + Clone;
134	/// Returns a reference to the actual [`LiquidityManager`] object.
135	fn get_lm(
136		&self,
137	) -> &LiquidityManager<Self::ES, Self::NS, Self::CM, Self::C, Self::K, Self::TP, Self::T>;
138}
139
140impl<
141		ES: Deref + Clone,
142		NS: Deref + Clone,
143		CM: Deref + Clone,
144		C: Deref + Clone,
145		K: Deref + Clone,
146		TP: Deref + Clone,
147		T: Deref + Clone,
148	> ALiquidityManager for LiquidityManager<ES, NS, CM, C, K, TP, T>
149where
150	ES::Target: EntropySource,
151	NS::Target: NodeSigner,
152	CM::Target: AChannelManager,
153	C::Target: Filter,
154	K::Target: KVStore,
155	TP::Target: TimeProvider,
156	T::Target: BroadcasterInterface,
157{
158	type EntropySource = ES::Target;
159	type ES = ES;
160	type NodeSigner = NS::Target;
161	type NS = NS;
162	type AChannelManager = CM::Target;
163	type CM = CM;
164	type Filter = C::Target;
165	type C = C;
166	type KVStore = K::Target;
167	type K = K;
168	type TimeProvider = TP::Target;
169	type TP = TP;
170	type BroadcasterInterface = T::Target;
171	type T = T;
172	fn get_lm(&self) -> &LiquidityManager<ES, NS, CM, C, K, TP, T> {
173		self
174	}
175}
176
177/// A trivial trait which describes any [`LiquidityManagerSync`].
178///
179/// This is not exported to bindings users as general cover traits aren't useful in other
180/// languages.
181pub trait ALiquidityManagerSync {
182	/// A type implementing [`EntropySource`]
183	type EntropySource: EntropySource + ?Sized;
184	/// A type that may be dereferenced to [`Self::EntropySource`].
185	type ES: Deref<Target = Self::EntropySource> + Clone;
186	/// A type implementing [`NodeSigner`]
187	type NodeSigner: NodeSigner + ?Sized;
188	/// A type that may be dereferenced to [`Self::NodeSigner`].
189	type NS: Deref<Target = Self::NodeSigner> + Clone;
190	/// A type implementing [`AChannelManager`]
191	type AChannelManager: AChannelManager + ?Sized;
192	/// A type that may be dereferenced to [`Self::AChannelManager`].
193	type CM: Deref<Target = Self::AChannelManager> + Clone;
194	/// A type implementing [`Filter`].
195	type Filter: Filter + ?Sized;
196	/// A type that may be dereferenced to [`Self::Filter`].
197	type C: Deref<Target = Self::Filter> + Clone;
198	/// A type implementing [`KVStoreSync`].
199	type KVStoreSync: KVStoreSync + ?Sized;
200	/// A type that may be dereferenced to [`Self::KVStoreSync`].
201	type KS: Deref<Target = Self::KVStoreSync> + Clone;
202	/// A type implementing [`TimeProvider`].
203	type TimeProvider: TimeProvider + ?Sized;
204	/// A type that may be dereferenced to [`Self::TimeProvider`].
205	type TP: Deref<Target = Self::TimeProvider> + Clone;
206	/// A type implementing [`BroadcasterInterface`].
207	type BroadcasterInterface: BroadcasterInterface + ?Sized;
208	/// A type that may be dereferenced to [`Self::BroadcasterInterface`].
209	type T: Deref<Target = Self::BroadcasterInterface> + Clone;
210	/// Returns the inner async [`LiquidityManager`] for testing purposes.
211	#[cfg(any(test, feature = "_test_utils"))]
212	fn get_lm_async(
213		&self,
214	) -> &LiquidityManager<
215		Self::ES,
216		Self::NS,
217		Self::CM,
218		Self::C,
219		KVStoreSyncWrapper<Self::KS>,
220		Self::TP,
221		Self::T,
222	>;
223	/// Returns a reference to the actual [`LiquidityManager`] object.
224	fn get_lm(
225		&self,
226	) -> &LiquidityManagerSync<Self::ES, Self::NS, Self::CM, Self::C, Self::KS, Self::TP, Self::T>;
227}
228
229impl<
230		ES: Deref + Clone,
231		NS: Deref + Clone,
232		CM: Deref + Clone,
233		C: Deref + Clone,
234		KS: Deref + Clone,
235		TP: Deref + Clone,
236		T: Deref + Clone,
237	> ALiquidityManagerSync for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
238where
239	ES::Target: EntropySource,
240	NS::Target: NodeSigner,
241	CM::Target: AChannelManager,
242	C::Target: Filter,
243	KS::Target: KVStoreSync,
244	TP::Target: TimeProvider,
245	T::Target: BroadcasterInterface,
246{
247	type EntropySource = ES::Target;
248	type ES = ES;
249	type NodeSigner = NS::Target;
250	type NS = NS;
251	type AChannelManager = CM::Target;
252	type CM = CM;
253	type Filter = C::Target;
254	type C = C;
255	type KVStoreSync = KS::Target;
256	type KS = KS;
257	type TimeProvider = TP::Target;
258	type TP = TP;
259	type BroadcasterInterface = T::Target;
260	type T = T;
261	/// Returns the inner async [`LiquidityManager`] for testing purposes.
262	#[cfg(any(test, feature = "_test_utils"))]
263	fn get_lm_async(
264		&self,
265	) -> &LiquidityManager<
266		Self::ES,
267		Self::NS,
268		Self::CM,
269		Self::C,
270		KVStoreSyncWrapper<Self::KS>,
271		Self::TP,
272		Self::T,
273	> {
274		&self.inner
275	}
276	fn get_lm(&self) -> &LiquidityManagerSync<ES, NS, CM, C, KS, TP, T> {
277		self
278	}
279}
280
281/// The main interface into LSP functionality.
282///
283/// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`].
284///
285/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface
286/// [`LiquidityEvent`]'s that likely need to be handled.
287///
288/// If the LSPS2 service is configured, users must forward the following parameters from LDK events:
289/// - [`Event::HTLCIntercepted`] to [`LSPS2ServiceHandler::htlc_intercepted`]
290/// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`]
291/// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`]
292/// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`]
293///
294/// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
295/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
296/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
297/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
298/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
299/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
300pub struct LiquidityManager<
301	ES: Deref + Clone,
302	NS: Deref + Clone,
303	CM: Deref + Clone,
304	C: Deref + Clone,
305	K: Deref + Clone,
306	TP: Deref + Clone,
307	T: Deref + Clone,
308> where
309	ES::Target: EntropySource,
310	NS::Target: NodeSigner,
311	CM::Target: AChannelManager,
312	C::Target: Filter,
313	K::Target: KVStore,
314	TP::Target: TimeProvider,
315	T::Target: BroadcasterInterface,
316{
317	pending_messages: Arc<MessageQueue>,
318	pending_events: Arc<EventQueue<K>>,
319	request_id_to_method_map: Mutex<HashMap<LSPSRequestId, LSPSMethod>>,
320	// We ignore peers if they send us bogus data.
321	ignored_peers: RwLock<HashSet<PublicKey>>,
322	lsps0_client_handler: LSPS0ClientHandler<ES, K>,
323	lsps0_service_handler: Option<LSPS0ServiceHandler>,
324	#[cfg(lsps1_service)]
325	lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, C, K>>,
326	lsps1_client_handler: Option<LSPS1ClientHandler<ES, K>>,
327	lsps2_service_handler: Option<LSPS2ServiceHandler<CM, K, T>>,
328	lsps2_client_handler: Option<LSPS2ClientHandler<ES, K>>,
329	lsps5_service_handler: Option<LSPS5ServiceHandler<CM, NS, K, TP>>,
330	lsps5_client_handler: Option<LSPS5ClientHandler<ES, K>>,
331	service_config: Option<LiquidityServiceConfig>,
332	_client_config: Option<LiquidityClientConfig>,
333	best_block: RwLock<Option<BestBlock>>,
334	_chain_source: Option<C>,
335	pending_msgs_or_needs_persist_notifier: Arc<Notifier>,
336}
337
338#[cfg(feature = "time")]
339impl<
340		ES: Deref + Clone,
341		NS: Deref + Clone,
342		CM: Deref + Clone,
343		C: Deref + Clone,
344		K: Deref + Clone,
345		T: Deref + Clone,
346	> LiquidityManager<ES, NS, CM, C, K, DefaultTimeProvider, T>
347where
348	ES::Target: EntropySource,
349	NS::Target: NodeSigner,
350	CM::Target: AChannelManager,
351	C::Target: Filter,
352	K::Target: KVStore,
353	T::Target: BroadcasterInterface,
354{
355	/// Constructor for the [`LiquidityManager`] using the default system clock
356	///
357	/// Will read persisted service states from the given [`KVStore`].
358	pub async fn new(
359		entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
360		chain_params: Option<ChainParameters>, kv_store: K, transaction_broadcaster: T,
361		service_config: Option<LiquidityServiceConfig>,
362		client_config: Option<LiquidityClientConfig>,
363	) -> Result<Self, lightning::io::Error> {
364		Self::new_with_custom_time_provider(
365			entropy_source,
366			node_signer,
367			channel_manager,
368			transaction_broadcaster,
369			chain_source,
370			chain_params,
371			kv_store,
372			service_config,
373			client_config,
374			DefaultTimeProvider,
375		)
376		.await
377	}
378}
379
380impl<
381		ES: Deref + Clone,
382		NS: Deref + Clone,
383		CM: Deref + Clone,
384		C: Deref + Clone,
385		K: Deref + Clone,
386		TP: Deref + Clone,
387		T: Deref + Clone,
388	> LiquidityManager<ES, NS, CM, C, K, TP, T>
389where
390	ES::Target: EntropySource,
391	NS::Target: NodeSigner,
392	CM::Target: AChannelManager,
393	C::Target: Filter,
394	K::Target: KVStore,
395	TP::Target: TimeProvider,
396	T::Target: BroadcasterInterface,
397{
398	/// Constructor for the [`LiquidityManager`] with a custom time provider.
399	///
400	/// Will read persisted service states from the given [`KVStore`].
401	///
402	/// This should be used on non-std platforms where access to the system time is not
403	/// available.
404	/// Sets up the required protocol message handlers based on the given
405	/// [`LiquidityClientConfig`] and [`LiquidityServiceConfig`].
406	pub async fn new_with_custom_time_provider(
407		entropy_source: ES, node_signer: NS, channel_manager: CM, transaction_broadcaster: T,
408		chain_source: Option<C>, chain_params: Option<ChainParameters>, kv_store: K,
409		service_config: Option<LiquidityServiceConfig>,
410		client_config: Option<LiquidityClientConfig>, time_provider: TP,
411	) -> Result<Self, lightning::io::Error> {
412		let pending_msgs_or_needs_persist_notifier = Arc::new(Notifier::new());
413		let pending_messages =
414			Arc::new(MessageQueue::new(Arc::clone(&pending_msgs_or_needs_persist_notifier)));
415		let persisted_queue = read_event_queue(kv_store.clone()).await?.unwrap_or_default();
416		let pending_events = Arc::new(EventQueue::new(
417			persisted_queue,
418			kv_store.clone(),
419			Arc::clone(&pending_msgs_or_needs_persist_notifier),
420		));
421		let ignored_peers = RwLock::new(new_hash_set());
422
423		let mut supported_protocols = Vec::new();
424
425		let lsps2_client_handler = client_config.as_ref().and_then(|config| {
426			config.lsps2_client_config.map(|config| {
427				LSPS2ClientHandler::new(
428					entropy_source.clone(),
429					Arc::clone(&pending_messages),
430					Arc::clone(&pending_events),
431					config.clone(),
432				)
433			})
434		});
435
436		let lsps2_service_handler = if let Some(service_config) = service_config.as_ref() {
437			if let Some(lsps2_service_config) = service_config.lsps2_service_config.as_ref() {
438				if let Some(number) =
439					<LSPS2ServiceHandler<CM, K, T> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
440				{
441					supported_protocols.push(number);
442				}
443
444				let peer_states = read_lsps2_service_peer_states(kv_store.clone()).await?;
445				Some(LSPS2ServiceHandler::new(
446					peer_states,
447					Arc::clone(&pending_messages),
448					Arc::clone(&pending_events),
449					channel_manager.clone(),
450					kv_store.clone(),
451					transaction_broadcaster.clone(),
452					lsps2_service_config.clone(),
453				)?)
454			} else {
455				None
456			}
457		} else {
458			None
459		};
460
461		let lsps5_client_handler = client_config.as_ref().and_then(|config| {
462			config.lsps5_client_config.as_ref().map(|config| {
463				LSPS5ClientHandler::new(
464					entropy_source.clone(),
465					Arc::clone(&pending_messages),
466					Arc::clone(&pending_events),
467					config.clone(),
468				)
469			})
470		});
471
472		let lsps5_service_handler = if let Some(service_config) = service_config.as_ref() {
473			if let Some(lsps5_service_config) = service_config.lsps5_service_config.as_ref() {
474				if let Some(number) =
475					<LSPS5ServiceHandler<CM, NS, K, TP> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
476				{
477					supported_protocols.push(number);
478				}
479
480				let peer_states = read_lsps5_service_peer_states(kv_store.clone()).await?;
481				Some(LSPS5ServiceHandler::new_with_time_provider(
482					peer_states,
483					Arc::clone(&pending_events),
484					Arc::clone(&pending_messages),
485					channel_manager.clone(),
486					kv_store.clone(),
487					node_signer,
488					lsps5_service_config.clone(),
489					time_provider,
490				))
491			} else {
492				None
493			}
494		} else {
495			None
496		};
497
498		let lsps1_client_handler = client_config.as_ref().and_then(|config| {
499			config.lsps1_client_config.as_ref().map(|config| {
500				LSPS1ClientHandler::new(
501					entropy_source.clone(),
502					Arc::clone(&pending_messages),
503					Arc::clone(&pending_events),
504					config.clone(),
505				)
506			})
507		});
508
509		#[cfg(lsps1_service)]
510		let lsps1_service_handler = service_config.as_ref().and_then(|config| {
511			if let Some(number) =
512				<LSPS1ServiceHandler<ES, CM, C, K> as LSPSProtocolMessageHandler>::PROTOCOL_NUMBER
513			{
514				supported_protocols.push(number);
515			}
516			config.lsps1_service_config.as_ref().map(|config| {
517				LSPS1ServiceHandler::new(
518					entropy_source.clone(),
519					Arc::clone(&pending_messages),
520					Arc::clone(&pending_events),
521					channel_manager.clone(),
522					chain_source.clone(),
523					config.clone(),
524				)
525			})
526		});
527
528		let lsps0_client_handler = LSPS0ClientHandler::new(
529			entropy_source.clone(),
530			Arc::clone(&pending_messages),
531			Arc::clone(&pending_events),
532		);
533
534		let lsps0_service_handler = if service_config.is_some() {
535			Some(LSPS0ServiceHandler::new(supported_protocols, Arc::clone(&pending_messages)))
536		} else {
537			None
538		};
539
540		Ok(Self {
541			pending_messages,
542			pending_events,
543			request_id_to_method_map: Mutex::new(new_hash_map()),
544			ignored_peers,
545			lsps0_client_handler,
546			lsps0_service_handler,
547			lsps1_client_handler,
548			#[cfg(lsps1_service)]
549			lsps1_service_handler,
550			lsps2_client_handler,
551			lsps2_service_handler,
552			lsps5_client_handler,
553			lsps5_service_handler,
554			service_config,
555			_client_config: client_config,
556			best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
557			_chain_source: chain_source,
558			pending_msgs_or_needs_persist_notifier,
559		})
560	}
561
562	/// Returns a reference to the LSPS0 client-side handler.
563	pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, K> {
564		&self.lsps0_client_handler
565	}
566
567	/// Returns a reference to the LSPS0 server-side handler.
568	pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
569		self.lsps0_service_handler.as_ref()
570	}
571
572	/// Returns a reference to the LSPS1 client-side handler.
573	///
574	/// The returned handler allows to initiate the LSPS1 client-side flow, i.e., allows to request
575	/// channels from the configured LSP.
576	pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES, K>> {
577		self.lsps1_client_handler.as_ref()
578	}
579
580	/// Returns a reference to the LSPS1 server-side handler.
581	#[cfg(lsps1_service)]
582	pub fn lsps1_service_handler(&self) -> Option<&LSPS1ServiceHandler<ES, CM, C, K>> {
583		self.lsps1_service_handler.as_ref()
584	}
585
586	/// Returns a reference to the LSPS2 client-side handler.
587	///
588	/// The returned handler allows to initiate the LSPS2 client-side flow. That is, it allows to
589	/// retrieve all necessary data to create 'just-in-time' invoices that, when paid, will have
590	/// the configured LSP open a 'just-in-time' channel.
591	pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES, K>> {
592		self.lsps2_client_handler.as_ref()
593	}
594
595	/// Returns a reference to the LSPS2 server-side handler.
596	///
597	/// The returned hendler allows to initiate the LSPS2 service-side flow.
598	pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM, K, T>> {
599		self.lsps2_service_handler.as_ref()
600	}
601
602	/// Returns a reference to the LSPS5 client-side handler.
603	///
604	/// The returned handler allows to initiate the LSPS5 client-side flow. That is, it allows to
605	pub fn lsps5_client_handler(&self) -> Option<&LSPS5ClientHandler<ES, K>> {
606		self.lsps5_client_handler.as_ref()
607	}
608
609	/// Returns a reference to the LSPS5 server-side handler.
610	///
611	/// The returned handler allows to initiate the LSPS5 service-side flow.
612	pub fn lsps5_service_handler(&self) -> Option<&LSPS5ServiceHandler<CM, NS, K, TP>> {
613		self.lsps5_service_handler.as_ref()
614	}
615
616	/// Returns a [`Future`] that will complete when the next batch of pending messages is ready to
617	/// be processed *or* we need to be repersisted.
618	///
619	/// Note that callbacks registered on the [`Future`] MUST NOT call back into this
620	/// [`LiquidityManager`] and should instead register actions to be taken later.
621	pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
622		self.pending_msgs_or_needs_persist_notifier.get_future()
623	}
624
625	/// Blocks the current thread until next event is ready and returns it.
626	///
627	/// Only available via the [`LiquidityManagerSync`] interface to avoid having users
628	/// accidentally blocking their async contexts.
629	#[cfg(feature = "std")]
630	pub(crate) fn wait_next_event(&self) -> LiquidityEvent {
631		self.pending_events.wait_next_event()
632	}
633
634	/// Returns `Some` if an event is ready.
635	///
636	/// Typically you would spawn a thread or task that calls this in a loop.
637	///
638	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
639	/// memory footprint. We will start dropping any generated events after
640	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
641	///
642	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
643	pub fn next_event(&self) -> Option<LiquidityEvent> {
644		self.pending_events.next_event()
645	}
646
647	/// Asynchronously polls the event queue and returns once the next event is ready.
648	///
649	/// Typically you would spawn a thread or task that calls this in a loop.
650	///
651	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
652	/// memory footprint. We will start dropping any generated events after
653	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
654	///
655	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
656	pub async fn next_event_async(&self) -> LiquidityEvent {
657		self.pending_events.next_event_async().await
658	}
659
660	/// Returns and clears all events without blocking.
661	///
662	/// Typically you would spawn a thread or task that calls this in a loop.
663	///
664	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
665	/// memory footprint. We will start dropping any generated events after
666	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
667	///
668	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
669	pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
670		self.pending_events.get_and_clear_pending_events()
671	}
672
673	/// Persists the state of the service handlers towards the given [`KVStore`] implementation.
674	///
675	/// This will be regularly called by LDK's background processor if necessary and only needs to
676	/// be called manually if it's not utilized.
677	pub async fn persist(&self) -> Result<(), lightning::io::Error> {
678		// TODO: We should eventually persist in parallel.
679		self.pending_events.persist().await?;
680
681		if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
682			lsps2_service_handler.persist().await?;
683		}
684
685		if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
686			lsps5_service_handler.persist().await?;
687		}
688
689		Ok(())
690	}
691
692	fn handle_lsps_message(
693		&self, msg: LSPSMessage, sender_node_id: &PublicKey,
694	) -> Result<(), lightning::ln::msgs::LightningError> {
695		match msg {
696			LSPSMessage::Invalid(_error) => {
697				return Err(LightningError { err: format!("{} did not understand a message we previously sent, maybe they don't support a protocol we are trying to use?", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Error)});
698			},
699			LSPSMessage::LSPS0(msg @ LSPS0Message::Response(..)) => {
700				self.lsps0_client_handler.handle_message(msg, sender_node_id)?;
701			},
702			LSPSMessage::LSPS0(msg @ LSPS0Message::Request(..)) => {
703				match &self.lsps0_service_handler {
704					Some(lsps0_service_handler) => {
705						lsps0_service_handler.handle_message(msg, sender_node_id)?;
706					},
707					None => {
708						return Err(LightningError { err: format!("Received LSPS0 request message without LSPS0 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
709					},
710				}
711			},
712			LSPSMessage::LSPS1(msg @ LSPS1Message::Response(..)) => {
713				match &self.lsps1_client_handler {
714					Some(lsps1_client_handler) => {
715						lsps1_client_handler.handle_message(msg, sender_node_id)?;
716					},
717					None => {
718						return Err(LightningError { err: format!("Received LSPS1 response message without LSPS1 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
719					},
720				}
721			},
722			LSPSMessage::LSPS1(_msg @ LSPS1Message::Request(..)) => {
723				#[cfg(lsps1_service)]
724				match &self.lsps1_service_handler {
725					Some(lsps1_service_handler) => {
726						lsps1_service_handler.handle_message(_msg, sender_node_id)?;
727					},
728					None => {
729						return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
730					},
731				}
732				#[cfg(not(lsps1_service))]
733				return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
734			},
735			LSPSMessage::LSPS2(msg @ LSPS2Message::Response(..)) => {
736				match &self.lsps2_client_handler {
737					Some(lsps2_client_handler) => {
738						lsps2_client_handler.handle_message(msg, sender_node_id)?;
739					},
740					None => {
741						return Err(LightningError { err: format!("Received LSPS2 response message without LSPS2 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
742					},
743				}
744			},
745			LSPSMessage::LSPS2(msg @ LSPS2Message::Request(..)) => {
746				match &self.lsps2_service_handler {
747					Some(lsps2_service_handler) => {
748						lsps2_service_handler.handle_message(msg, sender_node_id)?;
749					},
750					None => {
751						return Err(LightningError { err: format!("Received LSPS2 request message without LSPS2 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
752					},
753				}
754			},
755			LSPSMessage::LSPS5(msg @ LSPS5Message::Response(..)) => {
756				match &self.lsps5_client_handler {
757					Some(lsps5_client_handler) => {
758						lsps5_client_handler.handle_message(msg, sender_node_id)?;
759					},
760					None => {
761						return Err(LightningError { err: format!("Received LSPS5 response message without LSPS5 client handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
762					},
763				}
764			},
765			LSPSMessage::LSPS5(msg @ LSPS5Message::Request(..)) => {
766				match &self.lsps5_service_handler {
767					Some(lsps5_service_handler) => {
768						if let LSPS5Message::Request(ref req_id, ref req) = msg {
769							if req.is_state_allocating() {
770								let lsps2_has_active_requests = self
771									.lsps2_service_handler
772									.as_ref()
773									.map_or(false, |h| h.has_active_requests(sender_node_id));
774								#[cfg(lsps1_service)]
775								let lsps1_has_active_requests = self
776									.lsps1_service_handler
777									.as_ref()
778									.map_or(false, |h| h.has_active_requests(sender_node_id));
779								#[cfg(not(lsps1_service))]
780								let lsps1_has_active_requests = false;
781
782								lsps5_service_handler.enforce_prior_activity_or_reject(
783									sender_node_id,
784									lsps2_has_active_requests,
785									lsps1_has_active_requests,
786									req_id.clone(),
787								)?
788							}
789						}
790
791						lsps5_service_handler.handle_message(msg, sender_node_id)?;
792					},
793					None => {
794						return Err(LightningError { err: format!("Received LSPS5 request message without LSPS5 service handler configured. From node {}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Debug)});
795					},
796				}
797			},
798		}
799		Ok(())
800	}
801}
802
803impl<
804		ES: Deref + Clone,
805		NS: Deref + Clone,
806		CM: Deref + Clone,
807		C: Deref + Clone,
808		K: Deref + Clone,
809		TP: Deref + Clone,
810		T: Deref + Clone,
811	> CustomMessageReader for LiquidityManager<ES, NS, CM, C, K, TP, T>
812where
813	ES::Target: EntropySource,
814	NS::Target: NodeSigner,
815	CM::Target: AChannelManager,
816	C::Target: Filter,
817	K::Target: KVStore,
818	TP::Target: TimeProvider,
819	T::Target: BroadcasterInterface,
820{
821	type CustomMessage = RawLSPSMessage;
822
823	fn read<RD: LengthLimitedRead>(
824		&self, message_type: u16, buffer: &mut RD,
825	) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
826		match message_type {
827			LSPS_MESSAGE_TYPE_ID => {
828				Ok(Some(RawLSPSMessage::read_from_fixed_length_buffer(buffer)?))
829			},
830			_ => Ok(None),
831		}
832	}
833}
834
835impl<
836		ES: Deref + Clone,
837		NS: Deref + Clone,
838		CM: Deref + Clone,
839		C: Deref + Clone,
840		K: Deref + Clone,
841		TP: Deref + Clone,
842		T: Deref + Clone,
843	> CustomMessageHandler for LiquidityManager<ES, NS, CM, C, K, TP, T>
844where
845	ES::Target: EntropySource,
846	NS::Target: NodeSigner,
847	CM::Target: AChannelManager,
848	C::Target: Filter,
849	K::Target: KVStore,
850	TP::Target: TimeProvider,
851	T::Target: BroadcasterInterface,
852{
853	fn handle_custom_message(
854		&self, msg: Self::CustomMessage, sender_node_id: PublicKey,
855	) -> Result<(), lightning::ln::msgs::LightningError> {
856		{
857			if self.ignored_peers.read().unwrap().contains(&sender_node_id) {
858				let err = format!("Ignoring message from peer {}.", sender_node_id);
859				return Err(LightningError {
860					err,
861					action: ErrorAction::IgnoreAndLog(Level::Trace),
862				});
863			}
864		}
865
866		let message = {
867			{
868				let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
869				LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
870			}
871			.map_err(|_| {
872				let mut message_queue_notifier = self.pending_messages.notifier();
873
874				let error = LSPSResponseError {
875					code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
876					message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
877					data: None,
878				};
879
880				message_queue_notifier.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
881				self.ignored_peers.write().unwrap().insert(sender_node_id);
882				let err = format!(
883					"Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
884					sender_node_id
885				);
886				LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) }
887			})?
888		};
889
890		self.handle_lsps_message(message, &sender_node_id)
891	}
892
893	fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
894		let pending_messages = self.pending_messages.get_and_clear_pending_msgs();
895
896		let mut request_ids_and_methods = pending_messages
897			.iter()
898			.filter_map(|(_, msg)| msg.get_request_id_and_method())
899			.peekable();
900
901		if request_ids_and_methods.peek().is_some() {
902			let mut request_id_to_method_map_lock = self.request_id_to_method_map.lock().unwrap();
903			for (request_id, method) in request_ids_and_methods {
904				request_id_to_method_map_lock.insert(request_id, method);
905			}
906		}
907
908		pending_messages
909			.into_iter()
910			.filter_map(|(public_key, msg)| {
911				serde_json::to_string(&msg)
912					.ok()
913					.map(|payload| (public_key, RawLSPSMessage { payload }))
914			})
915			.collect()
916	}
917
918	fn provided_node_features(&self) -> NodeFeatures {
919		let mut features = NodeFeatures::empty();
920
921		let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
922
923		if advertise_service {
924			features
925				.set_optional_custom_bit(LSPS_FEATURE_BIT)
926				.expect("Failed to set LSPS feature bit");
927		}
928
929		features
930	}
931
932	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
933		let mut features = InitFeatures::empty();
934
935		let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
936		if advertise_service {
937			features
938				.set_optional_custom_bit(LSPS_FEATURE_BIT)
939				.expect("Failed to set LSPS feature bit");
940		}
941
942		features
943	}
944
945	fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
946		// If the peer was misbehaving, drop it from the ignored list to cleanup the kept state.
947		self.ignored_peers.write().unwrap().remove(&counterparty_node_id);
948
949		if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
950			lsps2_service_handler.peer_disconnected(counterparty_node_id);
951		}
952
953		if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
954			lsps5_service_handler.peer_disconnected(&counterparty_node_id);
955		}
956	}
957	fn peer_connected(
958		&self, counterparty_node_id: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init,
959		_: bool,
960	) -> Result<(), ()> {
961		if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
962			lsps5_service_handler.peer_connected(&counterparty_node_id);
963		}
964
965		Ok(())
966	}
967}
968
969impl<
970		ES: Deref + Clone,
971		NS: Deref + Clone,
972		CM: Deref + Clone,
973		C: Deref + Clone,
974		K: Deref + Clone,
975		TP: Deref + Clone,
976		T: Deref + Clone,
977	> Listen for LiquidityManager<ES, NS, CM, C, K, TP, T>
978where
979	ES::Target: EntropySource,
980	NS::Target: NodeSigner,
981	CM::Target: AChannelManager,
982	C::Target: Filter,
983	K::Target: KVStore,
984	TP::Target: TimeProvider,
985	T::Target: BroadcasterInterface,
986{
987	fn filtered_block_connected(
988		&self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
989		height: u32,
990	) {
991		if let Some(best_block) = self.best_block.read().unwrap().as_ref() {
992			assert_eq!(best_block.block_hash, header.prev_blockhash,
993			"Blocks must be connected in chain-order - the connected header must build on the last connected header");
994			assert_eq!(best_block.height, height - 1,
995			"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
996		}
997
998		self.transactions_confirmed(header, txdata, height);
999		self.best_block_updated(header, height);
1000	}
1001
1002	fn blocks_disconnected(&self, fork_point: BestBlock) {
1003		if let Some(best_block) = self.best_block.write().unwrap().as_mut() {
1004			assert!(best_block.height > fork_point.height,
1005				"Blocks disconnected must indicate disconnection from the current best height, i.e. the new chain tip must be lower than the previous best height");
1006			*best_block = fork_point;
1007		}
1008
1009		// TODO: Call block_disconnected on all sub-modules that require it, e.g., LSPS1MessageHandler.
1010		// Internally this should call transaction_unconfirmed for all transactions that were
1011		// confirmed at a height <= the one we now disconnected.
1012	}
1013}
1014
1015impl<
1016		ES: Deref + Clone,
1017		NS: Deref + Clone,
1018		CM: Deref + Clone,
1019		C: Deref + Clone,
1020		K: Deref + Clone,
1021		TP: Deref + Clone,
1022		T: Deref + Clone,
1023	> Confirm for LiquidityManager<ES, NS, CM, C, K, TP, T>
1024where
1025	ES::Target: EntropySource,
1026	NS::Target: NodeSigner,
1027	CM::Target: AChannelManager,
1028	C::Target: Filter,
1029	K::Target: KVStore,
1030	TP::Target: TimeProvider,
1031	T::Target: BroadcasterInterface,
1032{
1033	fn transactions_confirmed(
1034		&self, _header: &bitcoin::block::Header, _txdata: &chain::transaction::TransactionData,
1035		_height: u32,
1036	) {
1037		// TODO: Call transactions_confirmed on all sub-modules that require it, e.g., LSPS1MessageHandler.
1038	}
1039
1040	fn transaction_unconfirmed(&self, _txid: &bitcoin::Txid) {
1041		// TODO: Call transaction_unconfirmed on all sub-modules that require it, e.g., LSPS1MessageHandler.
1042		// Internally this should call transaction_unconfirmed for all transactions that were
1043		// confirmed at a height <= the one we now unconfirmed.
1044	}
1045
1046	fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
1047		let new_best_block = BestBlock::new(header.block_hash(), height);
1048		*self.best_block.write().unwrap() = Some(new_best_block);
1049
1050		// TODO: Call best_block_updated on all sub-modules that require it, e.g., LSPS1MessageHandler.
1051	}
1052
1053	fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
1054		// TODO: Collect relevant txids from all sub-modules that, e.g., LSPS1MessageHandler.
1055		Vec::new()
1056	}
1057}
1058
1059/// A synchroneous wrapper around [`LiquidityManager`] to be used in contexts where async is not
1060/// available.
1061pub struct LiquidityManagerSync<
1062	ES: Deref + Clone,
1063	NS: Deref + Clone,
1064	CM: Deref + Clone,
1065	C: Deref + Clone,
1066	KS: Deref + Clone,
1067	TP: Deref + Clone,
1068	T: Deref + Clone,
1069> where
1070	ES::Target: EntropySource,
1071	NS::Target: NodeSigner,
1072	CM::Target: AChannelManager,
1073	C::Target: Filter,
1074	KS::Target: KVStoreSync,
1075	TP::Target: TimeProvider,
1076	T::Target: BroadcasterInterface,
1077{
1078	inner: LiquidityManager<ES, NS, CM, C, KVStoreSyncWrapper<KS>, TP, T>,
1079}
1080
1081#[cfg(feature = "time")]
1082impl<
1083		ES: Deref + Clone,
1084		NS: Deref + Clone,
1085		CM: Deref + Clone,
1086		C: Deref + Clone,
1087		KS: Deref + Clone,
1088		T: Deref + Clone,
1089	> LiquidityManagerSync<ES, NS, CM, C, KS, DefaultTimeProvider, T>
1090where
1091	ES::Target: EntropySource,
1092	NS::Target: NodeSigner,
1093	CM::Target: AChannelManager,
1094	KS::Target: KVStoreSync,
1095	C::Target: Filter,
1096	T::Target: BroadcasterInterface,
1097{
1098	/// Constructor for the [`LiquidityManagerSync`] using the default system clock
1099	///
1100	/// Wraps [`LiquidityManager::new`].
1101	pub fn new(
1102		entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
1103		chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
1104		service_config: Option<LiquidityServiceConfig>,
1105		client_config: Option<LiquidityClientConfig>,
1106	) -> Result<Self, lightning::io::Error> {
1107		let kv_store = KVStoreSyncWrapper(kv_store_sync);
1108
1109		let mut fut = Box::pin(LiquidityManager::new(
1110			entropy_source,
1111			node_signer,
1112			channel_manager,
1113			chain_source,
1114			chain_params,
1115			kv_store,
1116			transaction_broadcaster,
1117			service_config,
1118			client_config,
1119		));
1120
1121		let mut waker = dummy_waker();
1122		let mut ctx = task::Context::from_waker(&mut waker);
1123		let inner = match fut.as_mut().poll(&mut ctx) {
1124			task::Poll::Ready(result) => result,
1125			task::Poll::Pending => {
1126				// In a sync context, we can't wait for the future to complete.
1127				unreachable!("LiquidityManager::new should not be pending in a sync context");
1128			},
1129		}?;
1130		Ok(Self { inner })
1131	}
1132}
1133
1134impl<
1135		ES: Deref + Clone,
1136		NS: Deref + Clone,
1137		CM: Deref + Clone,
1138		C: Deref + Clone,
1139		KS: Deref + Clone,
1140		TP: Deref + Clone,
1141		T: Deref + Clone,
1142	> LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1143where
1144	ES::Target: EntropySource,
1145	NS::Target: NodeSigner,
1146	CM::Target: AChannelManager,
1147	C::Target: Filter,
1148	KS::Target: KVStoreSync,
1149	TP::Target: TimeProvider,
1150	T::Target: BroadcasterInterface,
1151{
1152	/// Constructor for the [`LiquidityManagerSync`] with a custom time provider.
1153	///
1154	/// Wraps [`LiquidityManager::new_with_custom_time_provider`].
1155	pub fn new_with_custom_time_provider(
1156		entropy_source: ES, node_signer: NS, channel_manager: CM, chain_source: Option<C>,
1157		chain_params: Option<ChainParameters>, kv_store_sync: KS, transaction_broadcaster: T,
1158		service_config: Option<LiquidityServiceConfig>,
1159		client_config: Option<LiquidityClientConfig>, time_provider: TP,
1160	) -> Result<Self, lightning::io::Error> {
1161		let kv_store = KVStoreSyncWrapper(kv_store_sync);
1162		let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider(
1163			entropy_source,
1164			node_signer,
1165			channel_manager,
1166			transaction_broadcaster,
1167			chain_source,
1168			chain_params,
1169			kv_store,
1170			service_config,
1171			client_config,
1172			time_provider,
1173		));
1174
1175		let mut waker = dummy_waker();
1176		let mut ctx = task::Context::from_waker(&mut waker);
1177		let inner = match fut.as_mut().poll(&mut ctx) {
1178			task::Poll::Ready(result) => result,
1179			task::Poll::Pending => {
1180				// In a sync context, we can't wait for the future to complete.
1181				unreachable!("LiquidityManager::new should not be pending in a sync context");
1182			},
1183		}?;
1184		Ok(Self { inner })
1185	}
1186
1187	/// Returns a reference to the LSPS0 client-side handler.
1188	///
1189	/// Wraps [`LiquidityManager::lsps0_client_handler`].
1190	pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES, KVStoreSyncWrapper<KS>> {
1191		self.inner.lsps0_client_handler()
1192	}
1193
1194	/// Returns a reference to the LSPS0 server-side handler.
1195	///
1196	/// Wraps [`LiquidityManager::lsps0_service_handler`].
1197	pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
1198		self.inner.lsps0_service_handler()
1199	}
1200
1201	/// Returns a reference to the LSPS1 client-side handler.
1202	///
1203	/// Wraps [`LiquidityManager::lsps1_client_handler`].
1204	pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1205		self.inner.lsps1_client_handler()
1206	}
1207
1208	/// Returns a reference to the LSPS1 server-side handler.
1209	///
1210	/// Wraps [`LiquidityManager::lsps1_service_handler`].
1211	#[cfg(lsps1_service)]
1212	pub fn lsps1_service_handler(
1213		&self,
1214	) -> Option<&LSPS1ServiceHandler<ES, CM, C, KVStoreSyncWrapper<KS>>> {
1215		self.inner.lsps1_service_handler()
1216	}
1217
1218	/// Returns a reference to the LSPS2 client-side handler.
1219	///
1220	/// Wraps [`LiquidityManager::lsps2_client_handler`].
1221	pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1222		self.inner.lsps2_client_handler()
1223	}
1224
1225	/// Returns a reference to the LSPS2 server-side handler.
1226	///
1227	/// Wraps [`LiquidityManager::lsps2_service_handler`].
1228	pub fn lsps2_service_handler<'a>(
1229		&'a self,
1230	) -> Option<LSPS2ServiceHandlerSync<'a, CM, KVStoreSyncWrapper<KS>, T>> {
1231		self.inner.lsps2_service_handler.as_ref().map(|r| LSPS2ServiceHandlerSync::from_inner(r))
1232	}
1233
1234	/// Returns a reference to the LSPS5 client-side handler.
1235	///
1236	/// Wraps [`LiquidityManager::lsps5_client_handler`].
1237	pub fn lsps5_client_handler(&self) -> Option<&LSPS5ClientHandler<ES, KVStoreSyncWrapper<KS>>> {
1238		self.inner.lsps5_client_handler()
1239	}
1240
1241	/// Returns a reference to the LSPS5 server-side handler.
1242	///
1243	/// Wraps [`LiquidityManager::lsps5_service_handler`].
1244	pub fn lsps5_service_handler(
1245		&self,
1246	) -> Option<&LSPS5ServiceHandler<CM, NS, KVStoreSyncWrapper<KS>, TP>> {
1247		self.inner.lsps5_service_handler()
1248	}
1249
1250	/// Returns a [`Future`] that will complete when the next batch of pending messages is ready to
1251	/// be processed *or* we need to be repersisted.
1252	///
1253	/// Wraps [`LiquidityManager::get_pending_msgs_or_needs_persist_future`].
1254	pub fn get_pending_msgs_or_needs_persist_future(&self) -> Future {
1255		self.inner.get_pending_msgs_or_needs_persist_future()
1256	}
1257
1258	/// Blocks the current thread until next event is ready and returns it.
1259	///
1260	/// Typically you would spawn a thread or task that calls this in a loop.
1261	///
1262	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
1263	/// memory footprint. We will start dropping any generated events after
1264	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
1265	///
1266	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
1267	#[cfg(feature = "std")]
1268	pub fn wait_next_event(&self) -> LiquidityEvent {
1269		self.inner.wait_next_event()
1270	}
1271
1272	/// Returns `Some` if an event is ready.
1273	///
1274	/// Wraps [`LiquidityManager::next_event`].
1275	pub fn next_event(&self) -> Option<LiquidityEvent> {
1276		self.inner.next_event()
1277	}
1278
1279	/// Returns and clears all events without blocking.
1280	///
1281	/// Wraps [`LiquidityManager::get_and_clear_pending_events`].
1282	pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
1283		self.inner.get_and_clear_pending_events()
1284	}
1285
1286	/// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation.
1287	///
1288	/// Wraps [`LiquidityManager::persist`].
1289	pub fn persist(&self) -> Result<(), lightning::io::Error> {
1290		let mut waker = dummy_waker();
1291		let mut ctx = task::Context::from_waker(&mut waker);
1292		match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {
1293			task::Poll::Ready(result) => result,
1294			task::Poll::Pending => {
1295				// In a sync context, we can't wait for the future to complete.
1296				unreachable!("LiquidityManager::persist should not be pending in a sync context");
1297			},
1298		}
1299	}
1300}
1301
1302impl<
1303		ES: Deref + Clone,
1304		NS: Deref + Clone,
1305		CM: Deref + Clone,
1306		C: Deref + Clone,
1307		KS: Deref + Clone,
1308		TP: Deref + Clone,
1309		T: Deref + Clone,
1310	> CustomMessageReader for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1311where
1312	ES::Target: EntropySource,
1313	NS::Target: NodeSigner,
1314	CM::Target: AChannelManager,
1315	C::Target: Filter,
1316	KS::Target: KVStoreSync,
1317	TP::Target: TimeProvider,
1318	T::Target: BroadcasterInterface,
1319{
1320	type CustomMessage = RawLSPSMessage;
1321
1322	fn read<RD: LengthLimitedRead>(
1323		&self, message_type: u16, buffer: &mut RD,
1324	) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
1325		self.inner.read(message_type, buffer)
1326	}
1327}
1328
1329impl<
1330		ES: Deref + Clone,
1331		NS: Deref + Clone,
1332		CM: Deref + Clone,
1333		C: Deref + Clone,
1334		KS: Deref + Clone,
1335		TP: Deref + Clone,
1336		T: Deref + Clone,
1337	> CustomMessageHandler for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1338where
1339	ES::Target: EntropySource,
1340	NS::Target: NodeSigner,
1341	CM::Target: AChannelManager,
1342	C::Target: Filter,
1343	KS::Target: KVStoreSync,
1344	TP::Target: TimeProvider,
1345	T::Target: BroadcasterInterface,
1346{
1347	fn handle_custom_message(
1348		&self, msg: Self::CustomMessage, sender_node_id: PublicKey,
1349	) -> Result<(), lightning::ln::msgs::LightningError> {
1350		self.inner.handle_custom_message(msg, sender_node_id)
1351	}
1352
1353	fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
1354		self.inner.get_and_clear_pending_msg()
1355	}
1356
1357	fn provided_node_features(&self) -> NodeFeatures {
1358		self.inner.provided_node_features()
1359	}
1360
1361	fn provided_init_features(&self, their_node_id: PublicKey) -> InitFeatures {
1362		self.inner.provided_init_features(their_node_id)
1363	}
1364
1365	fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
1366		self.inner.peer_disconnected(counterparty_node_id)
1367	}
1368	fn peer_connected(
1369		&self, counterparty_node_id: bitcoin::secp256k1::PublicKey,
1370		init_msg: &lightning::ln::msgs::Init, inbound: bool,
1371	) -> Result<(), ()> {
1372		self.inner.peer_connected(counterparty_node_id, init_msg, inbound)
1373	}
1374}
1375
1376impl<
1377		ES: Deref + Clone,
1378		NS: Deref + Clone,
1379		CM: Deref + Clone,
1380		C: Deref + Clone,
1381		KS: Deref + Clone,
1382		TP: Deref + Clone,
1383		T: Deref + Clone,
1384	> Listen for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1385where
1386	ES::Target: EntropySource,
1387	NS::Target: NodeSigner,
1388	CM::Target: AChannelManager,
1389	C::Target: Filter,
1390	KS::Target: KVStoreSync,
1391	TP::Target: TimeProvider,
1392	T::Target: BroadcasterInterface,
1393{
1394	fn filtered_block_connected(
1395		&self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
1396		height: u32,
1397	) {
1398		self.inner.filtered_block_connected(header, txdata, height)
1399	}
1400
1401	fn blocks_disconnected(&self, fork_point: BestBlock) {
1402		self.inner.blocks_disconnected(fork_point);
1403	}
1404}
1405
1406impl<
1407		ES: Deref + Clone,
1408		NS: Deref + Clone,
1409		CM: Deref + Clone,
1410		C: Deref + Clone,
1411		KS: Deref + Clone,
1412		TP: Deref + Clone,
1413		T: Deref + Clone,
1414	> Confirm for LiquidityManagerSync<ES, NS, CM, C, KS, TP, T>
1415where
1416	ES::Target: EntropySource,
1417	NS::Target: NodeSigner,
1418	CM::Target: AChannelManager,
1419	C::Target: Filter,
1420	KS::Target: KVStoreSync,
1421	TP::Target: TimeProvider,
1422	T::Target: BroadcasterInterface,
1423{
1424	fn transactions_confirmed(
1425		&self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
1426		height: u32,
1427	) {
1428		self.inner.transactions_confirmed(header, txdata, height)
1429	}
1430
1431	fn transaction_unconfirmed(&self, txid: &bitcoin::Txid) {
1432		self.inner.transaction_unconfirmed(txid)
1433	}
1434
1435	fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
1436		self.inner.best_block_updated(header, height)
1437	}
1438
1439	fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
1440		self.inner.get_relevant_txids()
1441	}
1442}