1use crate::events::{Event, EventQueue};
2use crate::lsps0::client::LSPS0ClientHandler;
3use crate::lsps0::msgs::LSPS0Message;
4use crate::lsps0::ser::{
5 LSPSMessage, LSPSMethod, ProtocolMessageHandler, RawLSPSMessage, RequestId, ResponseError,
6 JSONRPC_INVALID_MESSAGE_ERROR_CODE, JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE,
7 LSPS_MESSAGE_TYPE_ID,
8};
9use crate::lsps0::service::LSPS0ServiceHandler;
10use crate::message_queue::{MessageQueue, ProcessMessagesCallback};
11
12use crate::lsps1::client::{LSPS1ClientConfig, LSPS1ClientHandler};
13use crate::lsps1::msgs::LSPS1Message;
14#[cfg(lsps1_service)]
15use crate::lsps1::service::{LSPS1ServiceConfig, LSPS1ServiceHandler};
16
17use crate::lsps2::client::{LSPS2ClientConfig, LSPS2ClientHandler};
18use crate::lsps2::msgs::LSPS2Message;
19use crate::lsps2::service::{LSPS2ServiceConfig, LSPS2ServiceHandler};
20use crate::prelude::{new_hash_map, new_hash_set, Box, HashMap, HashSet, ToString, Vec};
21use crate::sync::{Arc, Mutex, RwLock};
22
23use lightning::chain::{self, BestBlock, Confirm, Filter, Listen};
24use lightning::ln::channelmanager::{AChannelManager, ChainParameters};
25use lightning::ln::msgs::{ErrorAction, LightningError};
26use lightning::ln::peer_handler::CustomMessageHandler;
27use lightning::ln::wire::CustomMessageReader;
28use lightning::sign::EntropySource;
29use lightning::util::logger::Level;
30use lightning::util::ser::Readable;
31
32use lightning_types::features::{InitFeatures, NodeFeatures};
33
34use bitcoin::secp256k1::PublicKey;
35
36use core::ops::Deref;
37
38const LSPS_FEATURE_BIT: usize = 729;
39
40pub struct LiquidityServiceConfig {
45 #[cfg(lsps1_service)]
47 pub lsps1_service_config: Option<LSPS1ServiceConfig>,
48 pub lsps2_service_config: Option<LSPS2ServiceConfig>,
51 pub advertise_service: bool,
54}
55
56pub struct LiquidityClientConfig {
61 pub lsps1_client_config: Option<LSPS1ClientConfig>,
63 pub lsps2_client_config: Option<LSPS2ClientConfig>,
65}
66
67pub struct LiquidityManager<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone>
91where
92 ES::Target: EntropySource,
93 CM::Target: AChannelManager,
94 C::Target: Filter,
95{
96 pending_messages: Arc<MessageQueue>,
97 pending_events: Arc<EventQueue>,
98 request_id_to_method_map: Mutex<HashMap<RequestId, LSPSMethod>>,
99 ignored_peers: RwLock<HashSet<PublicKey>>,
101 lsps0_client_handler: LSPS0ClientHandler<ES>,
102 lsps0_service_handler: Option<LSPS0ServiceHandler>,
103 #[cfg(lsps1_service)]
104 lsps1_service_handler: Option<LSPS1ServiceHandler<ES, CM, C>>,
105 lsps1_client_handler: Option<LSPS1ClientHandler<ES>>,
106 lsps2_service_handler: Option<LSPS2ServiceHandler<CM>>,
107 lsps2_client_handler: Option<LSPS2ClientHandler<ES>>,
108 service_config: Option<LiquidityServiceConfig>,
109 _client_config: Option<LiquidityClientConfig>,
110 best_block: RwLock<Option<BestBlock>>,
111 _chain_source: Option<C>,
112}
113
114impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> LiquidityManager<ES, CM, C>
115where
116 ES::Target: EntropySource,
117 CM::Target: AChannelManager,
118 C::Target: Filter,
119{
120 pub fn new(
125 entropy_source: ES, channel_manager: CM, chain_source: Option<C>,
126 chain_params: Option<ChainParameters>, service_config: Option<LiquidityServiceConfig>,
127 client_config: Option<LiquidityClientConfig>,
128 ) -> Self
129where {
130 let pending_messages = Arc::new(MessageQueue::new());
131 let pending_events = Arc::new(EventQueue::new());
132 let ignored_peers = RwLock::new(new_hash_set());
133
134 let mut supported_protocols = Vec::new();
135
136 let lsps2_client_handler = client_config.as_ref().and_then(|config| {
137 config.lsps2_client_config.map(|config| {
138 LSPS2ClientHandler::new(
139 entropy_source.clone(),
140 Arc::clone(&pending_messages),
141 Arc::clone(&pending_events),
142 config.clone(),
143 )
144 })
145 });
146 let lsps2_service_handler = service_config.as_ref().and_then(|config| {
147 config.lsps2_service_config.as_ref().map(|config| {
148 if let Some(number) =
149 <LSPS2ServiceHandler<CM> as ProtocolMessageHandler>::PROTOCOL_NUMBER
150 {
151 supported_protocols.push(number);
152 }
153 LSPS2ServiceHandler::new(
154 Arc::clone(&pending_messages),
155 Arc::clone(&pending_events),
156 channel_manager.clone(),
157 config.clone(),
158 )
159 })
160 });
161
162 let lsps1_client_handler = client_config.as_ref().and_then(|config| {
163 config.lsps1_client_config.as_ref().map(|config| {
164 LSPS1ClientHandler::new(
165 entropy_source.clone(),
166 Arc::clone(&pending_messages),
167 Arc::clone(&pending_events),
168 config.clone(),
169 )
170 })
171 });
172
173 #[cfg(lsps1_service)]
174 let lsps1_service_handler = service_config.as_ref().and_then(|config| {
175 if let Some(number) =
176 <LSPS1ServiceHandler<ES> as ProtocolMessageHandler>::PROTOCOL_NUMBER
177 {
178 supported_protocols.push(number);
179 }
180 config.lsps1_service_config.as_ref().map(|config| {
181 LSPS1ServiceHandler::new(
182 entropy_source.clone(),
183 Arc::clone(&pending_messages),
184 Arc::clone(&pending_events),
185 channel_manager.clone(),
186 chain_source.clone(),
187 config.clone(),
188 )
189 })
190 });
191
192 let lsps0_client_handler = LSPS0ClientHandler::new(
193 entropy_source.clone(),
194 Arc::clone(&pending_messages),
195 Arc::clone(&pending_events),
196 );
197
198 let lsps0_service_handler = if service_config.is_some() {
199 Some(LSPS0ServiceHandler::new(vec![], Arc::clone(&pending_messages)))
200 } else {
201 None
202 };
203
204 Self {
205 pending_messages,
206 pending_events,
207 request_id_to_method_map: Mutex::new(new_hash_map()),
208 ignored_peers,
209 lsps0_client_handler,
210 lsps0_service_handler,
211 lsps1_client_handler,
212 #[cfg(lsps1_service)]
213 lsps1_service_handler,
214 lsps2_client_handler,
215 lsps2_service_handler,
216 service_config,
217 _client_config: client_config,
218 best_block: RwLock::new(chain_params.map(|chain_params| chain_params.best_block)),
219 _chain_source: chain_source,
220 }
221 }
222
223 pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler<ES> {
225 &self.lsps0_client_handler
226 }
227
228 pub fn lsps0_service_handler(&self) -> Option<&LSPS0ServiceHandler> {
230 self.lsps0_service_handler.as_ref()
231 }
232
233 pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler<ES>> {
238 self.lsps1_client_handler.as_ref()
239 }
240
241 #[cfg(lsps1_service)]
243 pub fn lsps1_service_handler(&self) -> Option<&LSPS1ServiceHandler<ES, CM, C>> {
244 self.lsps1_service_handler.as_ref()
245 }
246
247 pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler<ES>> {
253 self.lsps2_client_handler.as_ref()
254 }
255
256 pub fn lsps2_service_handler(&self) -> Option<&LSPS2ServiceHandler<CM>> {
260 self.lsps2_service_handler.as_ref()
261 }
262
263 pub fn set_process_msgs_callback<F: 'static + ProcessMessagesCallback>(&self, callback: F) {
319 self.pending_messages.set_process_msgs_callback(Box::new(callback));
320 }
321
322 #[cfg(feature = "std")]
332 pub fn wait_next_event(&self) -> Event {
333 self.pending_events.wait_next_event()
334 }
335
336 pub fn next_event(&self) -> Option<Event> {
346 self.pending_events.next_event()
347 }
348
349 pub async fn next_event_async(&self) -> Event {
359 self.pending_events.next_event_async().await
360 }
361
362 pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
372 self.pending_events.get_and_clear_pending_events()
373 }
374
375 fn handle_lsps_message(
376 &self, msg: LSPSMessage, sender_node_id: &PublicKey,
377 ) -> Result<(), lightning::ln::msgs::LightningError> {
378 match msg {
379 LSPSMessage::Invalid(_error) => {
380 return Err(LightningError { err: format!("{} did not understand a message we previously sent, maybe they don't support a protocol we are trying to use?", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Error)});
381 },
382 LSPSMessage::LSPS0(msg @ LSPS0Message::Response(..)) => {
383 self.lsps0_client_handler.handle_message(msg, sender_node_id)?;
384 },
385 LSPSMessage::LSPS0(msg @ LSPS0Message::Request(..)) => {
386 match &self.lsps0_service_handler {
387 Some(lsps0_service_handler) => {
388 lsps0_service_handler.handle_message(msg, sender_node_id)?;
389 },
390 None => {
391 return Err(LightningError { err: format!("Received LSPS0 request message without LSPS0 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
392 },
393 }
394 },
395 LSPSMessage::LSPS1(msg @ LSPS1Message::Response(..)) => {
396 match &self.lsps1_client_handler {
397 Some(lsps1_client_handler) => {
398 lsps1_client_handler.handle_message(msg, sender_node_id)?;
399 },
400 None => {
401 return Err(LightningError { err: format!("Received LSPS1 response message without LSPS1 client handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
402 },
403 }
404 },
405 LSPSMessage::LSPS1(_msg @ LSPS1Message::Request(..)) => {
406 #[cfg(lsps1_service)]
407 match &self.lsps1_service_handler {
408 Some(lsps1_service_handler) => {
409 lsps1_service_handler.handle_message(_msg, sender_node_id)?;
410 },
411 None => {
412 return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
413 },
414 }
415 #[cfg(not(lsps1_service))]
416 return Err(LightningError { err: format!("Received LSPS1 request message without LSPS1 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
417 },
418 LSPSMessage::LSPS2(msg @ LSPS2Message::Response(..)) => {
419 match &self.lsps2_client_handler {
420 Some(lsps2_client_handler) => {
421 lsps2_client_handler.handle_message(msg, sender_node_id)?;
422 },
423 None => {
424 return Err(LightningError { err: format!("Received LSPS2 response message without LSPS2 client handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
425 },
426 }
427 },
428 LSPSMessage::LSPS2(msg @ LSPS2Message::Request(..)) => {
429 match &self.lsps2_service_handler {
430 Some(lsps2_service_handler) => {
431 lsps2_service_handler.handle_message(msg, sender_node_id)?;
432 },
433 None => {
434 return Err(LightningError { err: format!("Received LSPS2 request message without LSPS2 service handler configured. From node = {:?}", sender_node_id), action: ErrorAction::IgnoreAndLog(Level::Info)});
435 },
436 }
437 },
438 }
439 Ok(())
440 }
441}
442
443impl<ES: Deref + Clone + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageReader
444 for LiquidityManager<ES, CM, C>
445where
446 ES::Target: EntropySource,
447 CM::Target: AChannelManager,
448 C::Target: Filter,
449{
450 type CustomMessage = RawLSPSMessage;
451
452 fn read<RD: lightning::io::Read>(
453 &self, message_type: u16, buffer: &mut RD,
454 ) -> Result<Option<Self::CustomMessage>, lightning::ln::msgs::DecodeError> {
455 match message_type {
456 LSPS_MESSAGE_TYPE_ID => Ok(Some(RawLSPSMessage::read(buffer)?)),
457 _ => Ok(None),
458 }
459 }
460}
461
462impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> CustomMessageHandler
463 for LiquidityManager<ES, CM, C>
464where
465 ES::Target: EntropySource,
466 CM::Target: AChannelManager,
467 C::Target: Filter,
468{
469 fn handle_custom_message(
470 &self, msg: Self::CustomMessage, sender_node_id: PublicKey,
471 ) -> Result<(), lightning::ln::msgs::LightningError> {
472 {
473 if self.ignored_peers.read().unwrap().contains(&sender_node_id) {
474 let err = format!("Ignoring message from peer {}.", sender_node_id);
475 return Err(LightningError {
476 err,
477 action: ErrorAction::IgnoreAndLog(Level::Trace),
478 });
479 }
480 }
481
482 let message = {
483 {
484 let mut request_id_to_method_map = self.request_id_to_method_map.lock().unwrap();
485 LSPSMessage::from_str_with_id_map(&msg.payload, &mut request_id_to_method_map)
486 }
487 .map_err(|_| {
488 let error = ResponseError {
489 code: JSONRPC_INVALID_MESSAGE_ERROR_CODE,
490 message: JSONRPC_INVALID_MESSAGE_ERROR_MESSAGE.to_string(),
491 data: None,
492 };
493
494 self.pending_messages.enqueue(&sender_node_id, LSPSMessage::Invalid(error));
495 self.ignored_peers.write().unwrap().insert(sender_node_id);
496 let err = format!(
497 "Failed to deserialize invalid LSPS message. Ignoring peer {} from now on.",
498 sender_node_id
499 );
500 LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) }
501 })?
502 };
503
504 self.handle_lsps_message(message, &sender_node_id)
505 }
506
507 fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> {
508 let pending_messages = self.pending_messages.get_and_clear_pending_msgs();
509
510 let mut request_ids_and_methods = pending_messages
511 .iter()
512 .filter_map(|(_, msg)| msg.get_request_id_and_method())
513 .peekable();
514
515 if request_ids_and_methods.peek().is_some() {
516 let mut request_id_to_method_map_lock = self.request_id_to_method_map.lock().unwrap();
517 for (request_id, method) in request_ids_and_methods {
518 request_id_to_method_map_lock.insert(request_id, method);
519 }
520 }
521
522 pending_messages
523 .into_iter()
524 .filter_map(|(public_key, msg)| {
525 serde_json::to_string(&msg)
526 .ok()
527 .map(|payload| (public_key, RawLSPSMessage { payload }))
528 })
529 .collect()
530 }
531
532 fn provided_node_features(&self) -> NodeFeatures {
533 let mut features = NodeFeatures::empty();
534
535 let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
536
537 if advertise_service {
538 features
539 .set_optional_custom_bit(LSPS_FEATURE_BIT)
540 .expect("Failed to set LSPS feature bit");
541 }
542
543 features
544 }
545
546 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
547 let mut features = InitFeatures::empty();
548
549 let advertise_service = self.service_config.as_ref().map_or(false, |c| c.advertise_service);
550 if advertise_service {
551 features
552 .set_optional_custom_bit(LSPS_FEATURE_BIT)
553 .expect("Failed to set LSPS feature bit");
554 }
555
556 features
557 }
558
559 fn peer_disconnected(&self, counterparty_node_id: bitcoin::secp256k1::PublicKey) {
560 self.ignored_peers.write().unwrap().remove(&counterparty_node_id);
562
563 if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
564 lsps2_service_handler.peer_disconnected(counterparty_node_id);
565 }
566 }
567 fn peer_connected(
568 &self, _: bitcoin::secp256k1::PublicKey, _: &lightning::ln::msgs::Init, _: bool,
569 ) -> Result<(), ()> {
570 Ok(())
571 }
572}
573
574impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Listen for LiquidityManager<ES, CM, C>
575where
576 ES::Target: EntropySource,
577 CM::Target: AChannelManager,
578 C::Target: Filter,
579{
580 fn filtered_block_connected(
581 &self, header: &bitcoin::block::Header, txdata: &chain::transaction::TransactionData,
582 height: u32,
583 ) {
584 if let Some(best_block) = self.best_block.read().unwrap().as_ref() {
585 assert_eq!(best_block.block_hash, header.prev_blockhash,
586 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
587 assert_eq!(best_block.height, height - 1,
588 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
589 }
590
591 self.transactions_confirmed(header, txdata, height);
592 self.best_block_updated(header, height);
593 }
594
595 fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) {
596 let new_height = height - 1;
597 if let Some(best_block) = self.best_block.write().unwrap().as_mut() {
598 assert_eq!(best_block.block_hash, header.block_hash(),
599 "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
600 assert_eq!(best_block.height, height,
601 "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
602 *best_block = BestBlock::new(header.prev_blockhash, new_height)
603 }
604
605 }
609}
610
611impl<ES: Deref + Clone, CM: Deref + Clone, C: Deref + Clone> Confirm for LiquidityManager<ES, CM, C>
612where
613 ES::Target: EntropySource,
614 CM::Target: AChannelManager,
615 C::Target: Filter,
616{
617 fn transactions_confirmed(
618 &self, _header: &bitcoin::block::Header, _txdata: &chain::transaction::TransactionData,
619 _height: u32,
620 ) {
621 }
623
624 fn transaction_unconfirmed(&self, _txid: &bitcoin::Txid) {
625 }
629
630 fn best_block_updated(&self, header: &bitcoin::block::Header, height: u32) {
631 let new_best_block = BestBlock::new(header.block_hash(), height);
632 *self.best_block.write().unwrap() = Some(new_best_block);
633
634 if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
636 lsps2_service_handler.prune_peer_state();
637 }
638 }
639
640 fn get_relevant_txids(&self) -> Vec<(bitcoin::Txid, u32, Option<bitcoin::BlockHash>)> {
641 Vec::new()
643 }
644}