lightning_liquidity/
manager.rs

1use crate::events::{Event, EventQueue};
2use crate::lsps0::client::LSPS0ClientHandler;
3use crate::lsps0::msgs::LSPS0Message;
4use crate::lsps0::ser::{
5	LSPSMessage, LSPSMethod, ProtocolMessageHandler, RawLSPSMessage, RequestId, ResponseError,
6	JSONRPC_INVALID_MESSAGE_ERROR_CODE, JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE,
7	LSPS_MESSAGE_TYPE_ID,
8};
9use crate::lsps0::service::LSPS0ServiceHandler;
10use crate::message_queue::{MessageQueue, ProcessMessagesCallback};
11
12use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
13use crate::lsps1::msgs::LSPS1Message;
14#[cfg(lsps1_service)]
15use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
16
17use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
18use crate::lsps2::msgs::LSPS2Message;
19use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
20use crate::prelude::{new_hash_map, new_hash_set, Box, HashMap, HashSet, ToString, Vec};
21use crate::sync::{Arc, Mutex, RwLock};
22
23use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
24use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
25use lightning::ln::msgs::{ErrorAction, LightningError};
26use lightning::ln::peer_handler::CustomMessageHandler;
27use lightning::ln::wire::CustomMessageReader;
28use lightning::sign::EntropySource;
29use lightning::util::logger::Level;
30use lightning::util::ser::Readable;
31
32use lightning_types::features::{InitFeatures, NodeFeatures};
33
34use bitcoin::secp256k1::PublicKey;
35
36use core::ops::Deref;
37
38const LSPS_FEATURE_BIT: usize = 729;
39
40/// A server-side configuration for [`LiquidityManager`].
41///
42/// Allows end-users to configure options when using the [`LiquidityManager`]
43/// to provide liquidity services to clients.
44pub struct LiquidityServiceConfig {
45	/// Optional server-side configuration for LSPS1 channel requests.
46	#[cfg(lsps1_service)]
47	pub lsps1_service_config: Option<LSPS1ServiceConfig>,
48	/// Optional server-side configuration for JIT channels
49	/// should you want to support them.
50	pub lsps2_service_config: Option<LSPS2ServiceConfig>,
51	/// Controls whether the liquidity service should be advertised via setting the feature bit in
52	/// node announcment and the init message.
53	pub advertise_service: bool,
54}
55
56/// A client-side configuration for [`LiquidityManager`].
57///
58/// Allows end-user to configure options when using the [`LiquidityManager`]
59/// to access liquidity services from a provider.
60pub struct LiquidityClientConfig {
61	/// Optional client-side configuration for LSPS1 channel requests.
62	pub lsps1_client_config: Option<LSPS1ClientConfig>,
63	/// Optional client-side configuration for JIT channels.
64	pub lsps2_client_config: Option<LSPS2ClientConfig>,
65}
66
67/// The main interface into LSP functionality.
68///
69/// Should be used as a [`CustomMessageHandler`] for your [`PeerManager`]'s [`MessageHandler`].
70///
71/// Users should provide a callback to process queued messages via
72/// [`LiquidityManager::set_process_msgs_callback`] post construction. This allows the
73/// [`LiquidityManager`] to wake the [`PeerManager`] when there are pending messages to be sent.
74///
75/// Users need to continually poll [`LiquidityManager::get_and_clear_pending_events`] in order to surface
76/// [`Event`]'s that likely need to be handled.
77///
78/// If the LSPS2 service is configured, users must forward the following parameters from LDK events:
79/// - [`Event::HTLCIntercepted`] to [`LSPS2ServiceHandler::htlc_intercepted`]
80/// - [`Event::ChannelReady`] to [`LSPS2ServiceHandler::channel_ready`]
81/// - [`Event::HTLCHandlingFailed`] to [`LSPS2ServiceHandler::htlc_handling_failed`]
82/// - [`Event::PaymentForwarded`] to [`LSPS2ServiceHandler::payment_forwarded`]
83///
84/// [`PeerManager`]: lightning::ln::peer_handler::PeerManager
85/// [`MessageHandler`]: lightning::ln::peer_handler::MessageHandler
86/// [`Event::HTLCIntercepted`]: lightning::events::Event::HTLCIntercepted
87/// [`Event::ChannelReady`]: lightning::events::Event::ChannelReady
88/// [`Event::HTLCHandlingFailed`]: lightning::events::Event::HTLCHandlingFailed
89/// [`Event::PaymentForwarded`]: lightning::events::Event::PaymentForwarded
90pub struct LiquidityManager<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone>
91where
92	ES::Target: EntropySource,
93	CM::Target: AChannelManager,
94	C::Target: Filter,
95{
96	pending_messages: Arc<MessageQueue>,
97	pending_events: Arc<EventQueue>,
98	request_id_to_method_map: Mutex<HashMap<RequestId, LSPSMethod>>,
99	// We ignore peers if they send us bogus data.
100	ignored_peers: RwLock<HashSet<PublicKey>>,
101	lsps0_client_handler: LSPS0ClientHandler<ES>,
102	lsps0_service_handler: Option<LSPS0ServiceHandler>,
103	#[cfg(lsps1_service)]
104	lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, C>>,
105	lsps1_client_handler: Option<LSPS1ClientHandler<ES>>,
106	lsps2_service_handler: Option<LSPS2ServiceHandler<CM>>,
107	lsps2_client_handler: Option<LSPS2ClientHandler<ES>>,
108	service_config: Option<LiquidityServiceConfig>,
109	_client_config: Option<LiquidityClientConfig>,
110	best_block: RwLock<Option<BestBlock>>,
111	_chain_source: Option<C>,
112}
113
114impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> LiquidityManager<ES, CM, C>
115where
116	ES::Target: EntropySource,
117	CM::Target: AChannelManager,
118	C::Target: Filter,
119{
120	/// Constructor for the [`LiquidityManager`].
121	///
122	/// Sets up the required protocol message handlers based on the given
123	/// [`LiquidityClientConfig`] and [`LiquidityServiceConfig`].
124	pub fn new(
125		entropy_source: ES, channel_manager: CM, chain_source: Option<C>,
126		chain_params: Option<ChainParameters>, service_config: Option<LiquidityServiceConfig>,
127		client_config: Option<LiquidityClientConfig>,
128	) -> Self
129where {
130		let pending_messages = Arc::new(MessageQueue::new());
131		let pending_events = Arc::new(EventQueue::new());
132		let ignored_peers = RwLock::new(new_hash_set());
133
134		let mut supported_protocols = Vec::new();
135
136		let lsps2_client_handler = client_config.as_ref().and_then(|config| {
137			config.lsps2_client_config.map(|config| {
138				LSPS2ClientHandler::new(
139					entropy_source.clone(),
140					Arc::clone(&pending_messages),
141					Arc::clone(&pending_events),
142					config.clone(),
143				)
144			})
145		});
146		let lsps2_service_handler = service_config.as_ref().and_then(|config| {
147			config.lsps2_service_config.as_ref().map(|config| {
148				if let Some(number) =
149					<LSPS2ServiceHandler<CM> as ProtocolMessageHandler>::PROTOCOL_NUMBER
150				{
151					supported_protocols.push(number);
152				}
153				LSPS2ServiceHandler::new(
154					Arc::clone(&pending_messages),
155					Arc::clone(&pending_events),
156					channel_manager.clone(),
157					config.clone(),
158				)
159			})
160		});
161
162		let lsps1_client_handler = client_config.as_ref().and_then(|config| {
163			config.lsps1_client_config.as_ref().map(|config| {
164				LSPS1ClientHandler::new(
165					entropy_source.clone(),
166					Arc::clone(&pending_messages),
167					Arc::clone(&pending_events),
168					config.clone(),
169				)
170			})
171		});
172
173		#[cfg(lsps1_service)]
174		let lsps1_service_handler = service_config.as_ref().and_then(|config| {
175			if let Some(number) =
176				<LSPS1ServiceHandler<ES> as ProtocolMessageHandler>::PROTOCOL_NUMBER
177			{
178				supported_protocols.push(number);
179			}
180			config.lsps1_service_config.as_ref().map(|config| {
181				LSPS1ServiceHandler::new(
182					entropy_source.clone(),
183					Arc::clone(&pending_messages),
184					Arc::clone(&pending_events),
185					channel_manager.clone(),
186					chain_source.clone(),
187					config.clone(),
188				)
189			})
190		});
191
192		let lsps0_client_handler = LSPS0ClientHandler::new(
193			entropy_source.clone(),
194			Arc::clone(&pending_messages),
195			Arc::clone(&pending_events),
196		);
197
198		let lsps0_service_handler = if service_config.is_some() {
199			Some(LSPS0ServiceHandler::new(vec![], Arc::clone(&pending_messages)))
200		} else {
201			None
202		};
203
204		Self {
205			pending_messages,
206			pending_events,
207			request_id_to_method_map: Mutex::new(new_hash_map()),
208			ignored_peers,
209			lsps0_client_handler,
210			lsps0_service_handler,
211			lsps1_client_handler,
212			#[cfg(lsps1_service)]
213			lsps1_service_handler,
214			lsps2_client_handler,
215			lsps2_service_handler,
216			service_config,
217			_client_config: client_config,
218			best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
219			_chain_source: chain_source,
220		}
221	}
222
223	/// Returns a reference to the LSPS0 client-side handler.
224	pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES> {
225		&self.lsps0_client_handler
226	}
227
228	/// Returns a reference to the LSPS0 server-side handler.
229	pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
230		self.lsps0_service_handler.as_ref()
231	}
232
233	/// Returns a reference to the LSPS1 client-side handler.
234	///
235	/// The returned hendler allows to initiate the LSPS1 client-side flow, i.e., allows to request
236	/// channels from the configured LSP.
237	pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES>> {
238		self.lsps1_client_handler.as_ref()
239	}
240
241	/// Returns a reference to the LSPS1 server-side handler.
242	#[cfg(lsps1_service)]
243	pub fn lsps1_service_handler(&self) -> Option<&LSPS1ServiceHandler<ES, CM, C>> {
244		self.lsps1_service_handler.as_ref()
245	}
246
247	/// Returns a reference to the LSPS2 client-side handler.
248	///
249	/// The returned hendler allows to initiate the LSPS2 client-side flow. That is, it allows to
250	/// retrieve all necessary data to create 'just-in-time' invoices that, when paid, will have
251	/// the configured LSP open a 'just-in-time' channel.
252	pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES>> {
253		self.lsps2_client_handler.as_ref()
254	}
255
256	/// Returns a reference to the LSPS2 server-side handler.
257	///
258	/// The returned hendler allows to initiate the LSPS2 service-side flow.
259	pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM>> {
260		self.lsps2_service_handler.as_ref()
261	}
262
263	/// Allows to set a callback that will be called after new messages are pushed to the message
264	/// queue.
265	///
266	/// Usually, you'll want to use this to call [`PeerManager::process_events`] to clear the
267	/// message queue. For example:
268	///
269	/// ```
270	/// # use lightning::io;
271	/// # use lightning_liquidity::LiquidityManager;
272	/// # use std::sync::{Arc, RwLock};
273	/// # use std::sync::atomic::{AtomicBool, Ordering};
274	/// # use std::time::SystemTime;
275	/// # struct MyStore {}
276	/// # impl lightning::util::persist::KVStore for MyStore {
277	/// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
278	/// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
279	/// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
280	/// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
281	/// # }
282	/// # struct MyEntropySource {}
283	/// # impl lightning::sign::EntropySource for MyEntropySource {
284	/// #     fn get_secure_random_bytes(&self) -> [u8; 32] { [0u8; 32] }
285	/// # }
286	/// # struct MyEventHandler {}
287	/// # impl MyEventHandler {
288	/// #     async fn handle_event(&self, _: lightning::events::Event) {}
289	/// # }
290	/// # #[derive(Eq, PartialEq, Clone, Hash)]
291	/// # struct MySocketDescriptor {}
292	/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
293	/// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
294	/// #     fn disconnect_socket(&mut self) {}
295	/// # }
296	/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
297	/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
298	/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
299	/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
300	/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
301	/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
302	/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
303	/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
304	/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
305	/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
306	/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
307	/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
308	/// # type MyLiquidityManager = LiquidityManager<Arc<MyEntropySource>, Arc<MyChannelManager>, Arc<MyFilter>>;
309	/// # fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_logger: Arc<MyLogger>, my_peer_manager: Arc<MyPeerManager>, my_liquidity_manager: Arc<MyLiquidityManager>) {
310	/// let process_msgs_pm = Arc::clone(&my_peer_manager);
311	/// let process_msgs_callback = move || process_msgs_pm.process_events();
312	///
313	/// my_liquidity_manager.set_process_msgs_callback(process_msgs_callback);
314	/// # }
315	/// ```
316	///
317	/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
318	pub fn set_process_msgs_callback<F: 'static + ProcessMessagesCallback>(&self, callback: F) {
319		self.pending_messages.set_process_msgs_callback(Box::new(callback));
320	}
321
322	/// Blocks the current thread until next event is ready and returns it.
323	///
324	/// Typically you would spawn a thread or task that calls this in a loop.
325	///
326	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
327	/// memory footprint. We will start dropping any generated events after
328	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
329	///
330	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
331	#[cfg(feature = "std")]
332	pub fn wait_next_event(&self) -> Event {
333		self.pending_events.wait_next_event()
334	}
335
336	/// Returns `Some` if an event is ready.
337	///
338	/// Typically you would spawn a thread or task that calls this in a loop.
339	///
340	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
341	/// memory footprint. We will start dropping any generated events after
342	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
343	///
344	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
345	pub fn next_event(&self) -> Option<Event> {
346		self.pending_events.next_event()
347	}
348
349	/// Asynchronously polls the event queue and returns once the next event is ready.
350	///
351	/// Typically you would spawn a thread or task that calls this in a loop.
352	///
353	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
354	/// memory footprint. We will start dropping any generated events after
355	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
356	///
357	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
358	pub async fn next_event_async(&self) -> Event {
359		self.pending_events.next_event_async().await
360	}
361
362	/// Returns and clears all events without blocking.
363	///
364	/// Typically you would spawn a thread or task that calls this in a loop.
365	///
366	/// **Note**: Users must handle events as soon as possible to avoid an increased event queue
367	/// memory footprint. We will start dropping any generated events after
368	/// [`MAX_EVENT_QUEUE_SIZE`] has been reached.
369	///
370	/// [`MAX_EVENT_QUEUE_SIZE`]: crate::events::MAX_EVENT_QUEUE_SIZE
371	pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
372		self.pending_events.get_and_clear_pending_events()
373	}
374
375	fn handle_lsps_message(
376		&self, msg: LSPSMessage, sender_node_id: &PublicKey,
377	) -> Result<(), lightning::ln::msgs::LightningError> {
378		match msg {
379			LSPSMessage::Invalid(_error) => {
380				return Err(LightningError { err: format!("{} did not understand a message we previously sent, maybe they don't support a protocol we are trying to use?", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Error)});
381			},
382			LSPSMessage::LSPS0(msg @ LSPS0Message::Response(..)) => {
383				self.lsps0_client_handler.handle_message(msg, sender_node_id)?;
384			},
385			LSPSMessage::LSPS0(msg @ LSPS0Message::Request(..)) => {
386				match &self.lsps0_service_handler {
387					Some(lsps0_service_handler) => {
388						lsps0_service_handler.handle_message(msg, sender_node_id)?;
389					},
390					None => {
391						return Err(LightningError { err: format!("Received LSPS0 request message without LSPS0 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
392					},
393				}
394			},
395			LSPSMessage::LSPS1(msg @ LSPS1Message::Response(..)) => {
396				match &self.lsps1_client_handler {
397					Some(lsps1_client_handler) => {
398						lsps1_client_handler.handle_message(msg, sender_node_id)?;
399					},
400					None => {
401						return Err(LightningError { err: format!("Received LSPS1 response message without LSPS1 client handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
402					},
403				}
404			},
405			LSPSMessage::LSPS1(_msg @ LSPS1Message::Request(..)) => {
406				#[cfg(lsps1_service)]
407				match &self.lsps1_service_handler {
408					Some(lsps1_service_handler) => {
409						lsps1_service_handler.handle_message(_msg, sender_node_id)?;
410					},
411					None => {
412						return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
413					},
414				}
415				#[cfg(not(lsps1_service))]
416				return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
417			},
418			LSPSMessage::LSPS2(msg @ LSPS2Message::Response(..)) => {
419				match &self.lsps2_client_handler {
420					Some(lsps2_client_handler) => {
421						lsps2_client_handler.handle_message(msg, sender_node_id)?;
422					},
423					None => {
424						return Err(LightningError { err: format!("Received LSPS2 response message without LSPS2 client handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
425					},
426				}
427			},
428			LSPSMessage::LSPS2(msg @ LSPS2Message::Request(..)) => {
429				match &self.lsps2_service_handler {
430					Some(lsps2_service_handler) => {
431						lsps2_service_handler.handle_message(msg, sender_node_id)?;
432					},
433					None => {
434						return Err(LightningError { err: format!("Received LSPS2 request message without LSPS2 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
435					},
436				}
437			},
438		}
439		Ok(())
440	}
441}
442
443impl<ES: Deref + Clone + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageReader
444	for LiquidityManager<ES, CM, C>
445where
446	ES::Target: EntropySource,
447	CM::Target: AChannelManager,
448	C::Target: Filter,
449{
450	type CustomMessage = RawLSPSMessage;
451
452	fn read<RD: lightning::io::Read>(
453		&self, message_type: u16, buffer: &mut RD,
454	) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
455		match message_type {
456			LSPS_MESSAGE_TYPE_ID => Ok(Some(RawLSPSMessage::read(buffer)?)),
457			_ => Ok(None),
458		}
459	}
460}
461
462impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
463	for LiquidityManager<ES, CM, C>
464where
465	ES::Target: EntropySource,
466	CM::Target: AChannelManager,
467	C::Target: Filter,
468{
469	fn handle_custom_message(
470		&self, msg: Self::CustomMessage, sender_node_id: PublicKey,
471	) -> Result<(), lightning::ln::msgs::LightningError> {
472		{
473			if self.ignored_peers.read().unwrap().contains(&sender_node_id) {
474				let err = format!("Ignoring message from peer {}.", sender_node_id);
475				return Err(LightningError {
476					err,
477					action: ErrorAction::IgnoreAndLog(Level::Trace),
478				});
479			}
480		}
481
482		let message = {
483			{
484				let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
485				LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
486			}
487			.map_err(|_| {
488				let error = ResponseError {
489					code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
490					message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
491					data: None,
492				};
493
494				self.pending_messages.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
495				self.ignored_peers.write().unwrap().insert(sender_node_id);
496				let err = format!(
497					"Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
498					sender_node_id
499				);
500				LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) }
501			})?
502		};
503
504		self.handle_lsps_message(message, &sender_node_id)
505	}
506
507	fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
508		let pending_messages = self.pending_messages.get_and_clear_pending_msgs();
509
510		let mut request_ids_and_methods = pending_messages
511			.iter()
512			.filter_map(|(_, msg)| msg.get_request_id_and_method())
513			.peekable();
514
515		if request_ids_and_methods.peek().is_some() {
516			let mut request_id_to_method_map_lock = self.request_id_to_method_map.lock().unwrap();
517			for (request_id, method) in request_ids_and_methods {
518				request_id_to_method_map_lock.insert(request_id, method);
519			}
520		}
521
522		pending_messages
523			.into_iter()
524			.filter_map(|(public_key, msg)| {
525				serde_json::to_string(&msg)
526					.ok()
527					.map(|payload| (public_key, RawLSPSMessage { payload }))
528			})
529			.collect()
530	}
531
532	fn provided_node_features(&self) -> NodeFeatures {
533		let mut features = NodeFeatures::empty();
534
535		let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
536
537		if advertise_service {
538			features
539				.set_optional_custom_bit(LSPS_FEATURE_BIT)
540				.expect("Failed to set LSPS feature bit");
541		}
542
543		features
544	}
545
546	fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
547		let mut features = InitFeatures::empty();
548
549		let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
550		if advertise_service {
551			features
552				.set_optional_custom_bit(LSPS_FEATURE_BIT)
553				.expect("Failed to set LSPS feature bit");
554		}
555
556		features
557	}
558
559	fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
560		// If the peer was misbehaving, drop it from the ignored list to cleanup the kept state.
561		self.ignored_peers.write().unwrap().remove(&counterparty_node_id);
562
563		if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
564			lsps2_service_handler.peer_disconnected(counterparty_node_id);
565		}
566	}
567	fn peer_connected(
568		&self, _: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init, _: bool,
569	) -> Result<(), ()> {
570		Ok(())
571	}
572}
573
574impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Listen for LiquidityManager<ES, CM, C>
575where
576	ES::Target: EntropySource,
577	CM::Target: AChannelManager,
578	C::Target: Filter,
579{
580	fn filtered_block_connected(
581		&self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
582		height: u32,
583	) {
584		if let Some(best_block) = self.best_block.read().unwrap().as_ref() {
585			assert_eq!(best_block.block_hash, header.prev_blockhash,
586			"Blocks must be connected in chain-order - the connected header must build on the last connected header");
587			assert_eq!(best_block.height, height - 1,
588			"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
589		}
590
591		self.transactions_confirmed(header, txdata, height);
592		self.best_block_updated(header, height);
593	}
594
595	fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
596		let new_height = height - 1;
597		if let Some(best_block) = self.best_block.write().unwrap().as_mut() {
598			assert_eq!(best_block.block_hash, header.block_hash(),
599				"Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
600			assert_eq!(best_block.height, height,
601				"Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
602			*best_block = BestBlock::new(header.prev_blockhash, new_height)
603		}
604
605		// TODO: Call block_disconnected on all sub-modules that require it, e.g., LSPS1MessageHandler.
606		// Internally this should call transaction_unconfirmed for all transactions that were
607		// confirmed at a height <= the one we now disconnected.
608	}
609}
610
611impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Confirm for LiquidityManager<ES, CM, C>
612where
613	ES::Target: EntropySource,
614	CM::Target: AChannelManager,
615	C::Target: Filter,
616{
617	fn transactions_confirmed(
618		&self, _header: &bitcoin::block::Header, _txdata: &chain::transaction::TransactionData,
619		_height: u32,
620	) {
621		// TODO: Call transactions_confirmed on all sub-modules that require it, e.g., LSPS1MessageHandler.
622	}
623
624	fn transaction_unconfirmed(&self, _txid: &bitcoin::Txid) {
625		// TODO: Call transaction_unconfirmed on all sub-modules that require it, e.g., LSPS1MessageHandler.
626		// Internally this should call transaction_unconfirmed for all transactions that were
627		// confirmed at a height <= the one we now unconfirmed.
628	}
629
630	fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
631		let new_best_block = BestBlock::new(header.block_hash(), height);
632		*self.best_block.write().unwrap() = Some(new_best_block);
633
634		// TODO: Call best_block_updated on all sub-modules that require it, e.g., LSPS1MessageHandler.
635		if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
636			lsps2_service_handler.prune_peer_state();
637		}
638	}
639
640	fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
641		// TODO: Collect relevant txids from all sub-modules that, e.g., LSPS1MessageHandler.
642		Vec::new()
643	}
644}