1use crate::alloc::string::ToString;
13use crate::events::EventQueue;
14use crate::lsps0::ser::{LSPSDateTime, LSPSProtocolMessageHandler, LSPSRequestId};
15use crate::lsps5::msgs::{
16 ListWebhooksRequest, ListWebhooksResponse, RemoveWebhookRequest, RemoveWebhookResponse,
17 SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
18};
19use crate::message_queue::MessageQueue;
20use crate::persist::{
21 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
22};
23use crate::prelude::hash_map::Entry;
24use crate::prelude::*;
25use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
26use crate::utils::time::TimeProvider;
27
28use bitcoin::secp256k1::PublicKey;
29
30use lightning::impl_writeable_tlv_based;
31use lightning::ln::channelmanager::AChannelManager;
32use lightning::ln::msgs::{ErrorAction, LightningError};
33use lightning::sign::NodeSigner;
34use lightning::util::logger::Level;
35use lightning::util::persist::KVStore;
36use lightning::util::ser::Writeable;
37
38use core::ops::Deref;
39use core::sync::atomic::{AtomicUsize, Ordering};
40use core::time::Duration;
41
42use alloc::string::String;
43use alloc::vec::Vec;
44
45use super::event::LSPS5ServiceEvent;
46use super::msgs::{
47 LSPS5AppName, LSPS5Message, LSPS5ProtocolError, LSPS5Request, LSPS5Response, LSPS5WebhookUrl,
48};
49
50pub const MIN_WEBHOOK_RETENTION_DAYS: Duration = Duration::from_secs(30 * 24 * 60 * 60);
52pub const PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS: Duration = Duration::from_secs(24 * 60 * 60);
54
55#[derive(Debug, Clone)]
57struct Webhook {
58 _app_name: LSPS5AppName,
59 url: LSPS5WebhookUrl,
60 _counterparty_node_id: PublicKey,
61 last_used: LSPSDateTime,
64 last_notification_sent: Option<LSPSDateTime>,
67}
68
69impl_writeable_tlv_based!(Webhook, {
70 (0, _app_name, required),
71 (2, url, required),
72 (4, _counterparty_node_id, required),
73 (6, last_used, required),
74 (8, last_notification_sent, option),
75});
76
77#[derive(Clone, Debug)]
79pub struct LSPS5ServiceConfig {
80 pub max_webhooks_per_client: u32,
82}
83
84pub const DEFAULT_MAX_WEBHOOKS_PER_CLIENT: u32 = 10;
86pub const NOTIFICATION_COOLDOWN_TIME: Duration = Duration::from_secs(60); impl Default for LSPS5ServiceConfig {
91 fn default() -> Self {
92 Self { max_webhooks_per_client: DEFAULT_MAX_WEBHOOKS_PER_CLIENT }
93 }
94}
95
96pub struct LSPS5ServiceHandler<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref>
129where
130 CM::Target: AChannelManager,
131 NS::Target: NodeSigner,
132 K::Target: KVStore,
133 TP::Target: TimeProvider,
134{
135 config: LSPS5ServiceConfig,
136 per_peer_state: RwLock<HashMap<PublicKey, PeerState>>,
137 event_queue: Arc<EventQueue<K>>,
138 pending_messages: Arc<MessageQueue>,
139 time_provider: TP,
140 channel_manager: CM,
141 node_signer: NS,
142 kv_store: K,
143 last_pruning: Mutex<Option<LSPSDateTime>>,
144 persistence_in_flight: AtomicUsize,
145}
146
147impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPS5ServiceHandler<CM, NS, K, TP>
148where
149 CM::Target: AChannelManager,
150 NS::Target: NodeSigner,
151 K::Target: KVStore,
152 TP::Target: TimeProvider,
153{
154 pub(crate) fn new_with_time_provider(
156 peer_states: HashMap<PublicKey, PeerState>, event_queue: Arc<EventQueue<K>>,
157 pending_messages: Arc<MessageQueue>, channel_manager: CM, kv_store: K, node_signer: NS,
158 config: LSPS5ServiceConfig, time_provider: TP,
159 ) -> Self {
160 assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
161 let per_peer_state = RwLock::new(peer_states);
162 Self {
163 config,
164 per_peer_state,
165 event_queue,
166 pending_messages,
167 time_provider,
168 channel_manager,
169 node_signer,
170 kv_store,
171 last_pruning: Mutex::new(None),
172 persistence_in_flight: AtomicUsize::new(0),
173 }
174 }
175
176 pub(crate) fn enforce_prior_activity_or_reject(
179 &self, client_id: &PublicKey, lsps2_has_active_requests: bool, lsps1_has_activity: bool,
180 request_id: LSPSRequestId,
181 ) -> Result<(), LightningError> {
182 let can_accept = self.client_has_open_channel(client_id)
183 || lsps2_has_active_requests
184 || lsps1_has_activity;
185
186 let mut message_queue_notifier = self.pending_messages.notifier();
187 if !can_accept {
188 let error = LSPS5ProtocolError::NoPriorActivityError;
189 let msg = LSPS5Message::Response(
190 request_id,
191 LSPS5Response::SetWebhookError(error.clone().into()),
192 )
193 .into();
194 message_queue_notifier.enqueue(&client_id, msg);
195 return Err(LightningError {
196 err: error.message().into(),
197 action: ErrorAction::IgnoreAndLog(Level::Info),
198 });
199 } else {
200 Ok(())
201 }
202 }
203
204 async fn persist_peer_state(
205 &self, counterparty_node_id: PublicKey,
206 ) -> Result<(), lightning::io::Error> {
207 let fut = {
208 let mut outer_state_lock = self.per_peer_state.write().unwrap();
209 let encoded = match outer_state_lock.get_mut(&counterparty_node_id) {
210 None => {
211 return Ok(());
213 },
214 Some(entry) => {
215 if !entry.needs_persist {
216 return Ok(());
218 } else {
219 entry.needs_persist = false;
220 entry.encode()
221 }
222 },
223 };
224
225 let key = counterparty_node_id.to_string();
226
227 self.kv_store.write(
230 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
231 LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
232 &key,
233 encoded,
234 )
235 };
236
237 fut.await.map_err(|e| {
238 self.per_peer_state
239 .write()
240 .unwrap()
241 .get_mut(&counterparty_node_id)
242 .map(|p| p.needs_persist = true);
243 e
244 })
245 }
246
247 pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
248 if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
253 return Ok(());
256 }
257
258 loop {
259 let mut need_remove = Vec::new();
260 let mut need_persist = Vec::new();
261
262 self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
263 {
264 let outer_state_lock = self.per_peer_state.read().unwrap();
265
266 for (client_id, peer_state) in outer_state_lock.iter() {
267 let is_prunable = peer_state.is_prunable();
268 let has_open_channel = self.client_has_open_channel(client_id);
269 if is_prunable && !has_open_channel {
270 need_remove.push(*client_id);
271 } else if peer_state.needs_persist {
272 need_persist.push(*client_id);
273 }
274 }
275 }
276
277 for client_id in need_persist.into_iter() {
278 debug_assert!(!need_remove.contains(&client_id));
279 self.persist_peer_state(client_id).await?;
280 }
281
282 for client_id in need_remove {
283 let mut future_opt = None;
284 {
285 let mut per_peer_state = self.per_peer_state.write().unwrap();
291 if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
292 let state = entry.get_mut();
293 if state.is_prunable() && !self.client_has_open_channel(&client_id) {
294 entry.remove();
295 let key = client_id.to_string();
296 future_opt = Some(self.kv_store.remove(
297 LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
298 LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
299 &key,
300 ));
301 } else {
302 state.needs_persist = true;
304 }
305 } else {
306 debug_assert!(false);
309 }
310 }
311 if let Some(future) = future_opt {
312 future.await?;
313 } else {
314 self.persist_peer_state(client_id).await?;
315 }
316 }
317
318 if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
319 self.persistence_in_flight.store(1, Ordering::Release);
322 continue;
323 }
324 break;
325 }
326
327 Ok(())
328 }
329
330 fn check_prune_stale_webhooks<'a>(
331 &self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
332 ) {
333 let mut last_pruning = self.last_pruning.lock().unwrap();
334 let now =
335 LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
336
337 let should_prune = last_pruning.as_ref().map_or(true, |last_time| {
338 now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
339 });
340
341 if should_prune {
342 for (_, peer_state) in outer_state_lock.iter_mut() {
343 peer_state.prune_stale_webhooks(now)
346 }
347 *last_pruning = Some(now);
348 }
349 }
350
351 fn handle_set_webhook(
352 &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
353 params: SetWebhookRequest,
354 ) -> Result<(), LightningError> {
355 let mut message_queue_notifier = self.pending_messages.notifier();
356
357 let mut outer_state_lock = self.per_peer_state.write().unwrap();
358
359 let peer_state =
360 outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default);
361
362 let now =
363 LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
364
365 let num_webhooks = peer_state.webhooks_len();
366 let mut no_change = false;
367
368 if let Some(webhook) = peer_state.webhook_mut(¶ms.app_name) {
369 no_change = webhook.url == params.webhook;
370 if !no_change {
371 webhook.url = params.webhook.clone();
373 webhook.last_used = now;
374 webhook.last_notification_sent = None;
375 peer_state.needs_persist |= true;
376 }
377 } else {
378 if num_webhooks >= self.config.max_webhooks_per_client as usize {
379 let error = LSPS5ProtocolError::TooManyWebhooks;
380 let msg = LSPS5Message::Response(
381 request_id,
382 LSPS5Response::SetWebhookError(error.clone().into()),
383 )
384 .into();
385 message_queue_notifier.enqueue(&counterparty_node_id, msg);
386 return Err(LightningError {
387 err: error.message().into(),
388 action: ErrorAction::IgnoreAndLog(Level::Info),
389 });
390 }
391
392 let webhook = Webhook {
393 _app_name: params.app_name.clone(),
394 url: params.webhook.clone(),
395 _counterparty_node_id: counterparty_node_id,
396 last_used: now,
397 last_notification_sent: None,
398 };
399
400 peer_state.insert_webhook(params.app_name.clone(), webhook);
401 }
402
403 if !no_change {
404 self.send_webhook_registered_notification(
405 counterparty_node_id,
406 params.app_name.clone(),
407 params.webhook,
408 )
409 .map_err(|e| {
410 let msg = LSPS5Message::Response(
411 request_id.clone(),
412 LSPS5Response::SetWebhookError(e.clone().into()),
413 )
414 .into();
415 message_queue_notifier.enqueue(&counterparty_node_id, msg);
416 LightningError {
417 err: e.message().into(),
418 action: ErrorAction::IgnoreAndLog(Level::Info),
419 }
420 })?;
421 }
422
423 let msg = LSPS5Message::Response(
424 request_id,
425 LSPS5Response::SetWebhook(SetWebhookResponse {
426 num_webhooks: peer_state.webhooks_len() as u32,
427 max_webhooks: self.config.max_webhooks_per_client,
428 no_change,
429 }),
430 )
431 .into();
432 message_queue_notifier.enqueue(&counterparty_node_id, msg);
433 Ok(())
434 }
435
436 fn handle_list_webhooks(
437 &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
438 _params: ListWebhooksRequest,
439 ) -> Result<(), LightningError> {
440 let mut message_queue_notifier = self.pending_messages.notifier();
441
442 let outer_state_lock = self.per_peer_state.read().unwrap();
443 let app_names =
444 outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default();
445
446 let max_webhooks = self.config.max_webhooks_per_client;
447
448 let response = ListWebhooksResponse { app_names, max_webhooks };
449 let msg = LSPS5Message::Response(request_id, LSPS5Response::ListWebhooks(response)).into();
450 message_queue_notifier.enqueue(&counterparty_node_id, msg);
451
452 Ok(())
453 }
454
455 fn handle_remove_webhook(
456 &self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
457 params: RemoveWebhookRequest,
458 ) -> Result<(), LightningError> {
459 let mut message_queue_notifier = self.pending_messages.notifier();
460
461 let mut outer_state_lock = self.per_peer_state.write().unwrap();
462
463 if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) {
464 if peer_state.remove_webhook(¶ms.app_name) {
465 let response = RemoveWebhookResponse {};
466 let msg =
467 LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response))
468 .into();
469 message_queue_notifier.enqueue(&counterparty_node_id, msg);
470
471 return Ok(());
472 }
473 }
474
475 let error = LSPS5ProtocolError::AppNameNotFound;
476 let msg = LSPS5Message::Response(
477 request_id,
478 LSPS5Response::RemoveWebhookError(error.clone().into()),
479 )
480 .into();
481
482 message_queue_notifier.enqueue(&counterparty_node_id, msg);
483 return Err(LightningError {
484 err: error.message().into(),
485 action: ErrorAction::IgnoreAndLog(Level::Info),
486 });
487 }
488
489 fn send_webhook_registered_notification(
490 &self, client_node_id: PublicKey, app_name: LSPS5AppName, url: LSPS5WebhookUrl,
491 ) -> Result<(), LSPS5ProtocolError> {
492 let notification = WebhookNotification::webhook_registered();
493 self.send_notification(client_node_id, app_name, url, notification)
494 }
495
496 pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), LSPS5ProtocolError> {
512 let notification = WebhookNotification::payment_incoming();
513 self.send_notifications_to_client_webhooks(client_id, notification)
514 }
515
516 pub fn notify_expiry_soon(
534 &self, client_id: PublicKey, timeout: u32,
535 ) -> Result<(), LSPS5ProtocolError> {
536 let notification = WebhookNotification::expiry_soon(timeout);
537 self.send_notifications_to_client_webhooks(client_id, notification)
538 }
539
540 pub fn notify_liquidity_management_request(
555 &self, client_id: PublicKey,
556 ) -> Result<(), LSPS5ProtocolError> {
557 let notification = WebhookNotification::liquidity_management_request();
558 self.send_notifications_to_client_webhooks(client_id, notification)
559 }
560
561 pub fn notify_onion_message_incoming(
576 &self, client_id: PublicKey,
577 ) -> Result<(), LSPS5ProtocolError> {
578 let notification = WebhookNotification::onion_message_incoming();
579 self.send_notifications_to_client_webhooks(client_id, notification)
580 }
581
582 fn send_notifications_to_client_webhooks(
583 &self, client_id: PublicKey, notification: WebhookNotification,
584 ) -> Result<(), LSPS5ProtocolError> {
585 let mut outer_state_lock = self.per_peer_state.write().unwrap();
586 let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) {
587 peer_state
588 } else {
589 return Ok(());
590 };
591
592 let now =
593 LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
594
595 if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered {
598 let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| {
599 webhook.last_notification_sent.as_ref().map_or(false, |last_sent| {
600 now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME
601 })
602 });
603
604 if rate_limit_applies {
605 return Err(LSPS5ProtocolError::SlowDownError);
606 }
607 }
608
609 for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() {
610 self.send_notification(
611 client_id,
612 app_name.clone(),
613 webhook.url.clone(),
614 notification.clone(),
615 )?;
616 webhook.last_used = now;
617 webhook.last_notification_sent = Some(now);
618 }
619 Ok(())
620 }
621
622 fn send_notification(
623 &self, counterparty_node_id: PublicKey, app_name: LSPS5AppName, url: LSPS5WebhookUrl,
624 notification: WebhookNotification,
625 ) -> Result<(), LSPS5ProtocolError> {
626 let event_queue_notifier = self.event_queue.notifier();
627 let timestamp =
628 LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
629
630 let signature_hex = self.sign_notification(¬ification, ×tamp)?;
631
632 let mut headers: HashMap<String, String> = [("Content-Type", "application/json")]
633 .into_iter()
634 .map(|(k, v)| (k.to_string(), v.to_string()))
635 .collect();
636 headers.insert("x-lsps5-timestamp".into(), timestamp.to_rfc3339());
637 headers.insert("x-lsps5-signature".into(), signature_hex);
638
639 event_queue_notifier.enqueue(LSPS5ServiceEvent::SendWebhookNotification {
640 counterparty_node_id,
641 app_name,
642 url,
643 notification,
644 headers,
645 });
646
647 Ok(())
648 }
649
650 fn sign_notification(
651 &self, body: &WebhookNotification, timestamp: &LSPSDateTime,
652 ) -> Result<String, LSPS5ProtocolError> {
653 let notification_json =
654 serde_json::to_string(body).map_err(|_| LSPS5ProtocolError::SerializationError)?;
655
656 let message = format!(
657 "LSPS5: DO NOT SIGN THIS MESSAGE MANUALLY: LSP: At {} I notify {}",
658 timestamp.to_rfc3339(),
659 notification_json
660 );
661
662 self.node_signer
663 .sign_message(message.as_bytes())
664 .map_err(|_| LSPS5ProtocolError::UnknownError)
665 }
666
667 fn client_has_open_channel(&self, client_id: &PublicKey) -> bool {
668 self.channel_manager
669 .get_cm()
670 .list_channels()
671 .iter()
672 .any(|c| c.is_usable && c.counterparty.node_id == *client_id)
673 }
674
675 pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) {
676 let mut outer_state_lock = self.per_peer_state.write().unwrap();
677 if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
678 peer_state.reset_notification_cooldown();
679 }
680 self.check_prune_stale_webhooks(&mut outer_state_lock);
681 }
682
683 pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
684 let mut outer_state_lock = self.per_peer_state.write().unwrap();
685 if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
686 peer_state.reset_notification_cooldown();
687 }
688 self.check_prune_stale_webhooks(&mut outer_state_lock);
689 }
690}
691
692impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPSProtocolMessageHandler
693 for LSPS5ServiceHandler<CM, NS, K, TP>
694where
695 CM::Target: AChannelManager,
696 NS::Target: NodeSigner,
697 K::Target: KVStore,
698 TP::Target: TimeProvider,
699{
700 type ProtocolMessage = LSPS5Message;
701 const PROTOCOL_NUMBER: Option<u16> = Some(5);
702
703 fn handle_message(
704 &self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
705 ) -> Result<(), LightningError> {
706 match message {
707 LSPS5Message::Request(request_id, request) => {
708 let res = match request {
709 LSPS5Request::SetWebhook(params) => {
710 self.handle_set_webhook(*counterparty_node_id, request_id, params)
711 },
712 LSPS5Request::ListWebhooks(params) => {
713 self.handle_list_webhooks(*counterparty_node_id, request_id, params)
714 },
715 LSPS5Request::RemoveWebhook(params) => {
716 self.handle_remove_webhook(*counterparty_node_id, request_id, params)
717 },
718 };
719 res
720 },
721 _ => {
722 debug_assert!(
723 false,
724 "Service handler received LSPS5 response message. This should never happen."
725 );
726 let err = format!(
727 "Service handler received LSPS5 response message from node {}. This should never happen.",
728 counterparty_node_id
729 );
730 Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) })
731 },
732 }
733 }
734}
735
736#[derive(Debug)]
737pub(crate) struct PeerState {
738 webhooks: Vec<(LSPS5AppName, Webhook)>,
739 needs_persist: bool,
740}
741
742impl PeerState {
743 fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> {
744 let res =
745 self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None });
746 self.needs_persist |= true;
747 res
748 }
749
750 fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> {
751 &self.webhooks
752 }
753
754 fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> {
755 let res = &mut self.webhooks;
756 self.needs_persist |= true;
757 res
758 }
759
760 fn webhooks_len(&self) -> usize {
761 self.webhooks.len()
762 }
763
764 fn app_names(&self) -> Vec<LSPS5AppName> {
765 self.webhooks.iter().map(|(n, _)| n).cloned().collect()
766 }
767
768 fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) {
769 for (n, h) in self.webhooks.iter_mut() {
770 if *n == name {
771 *h = hook;
772 return;
773 }
774 }
775
776 self.webhooks.push((name, hook));
777 self.needs_persist |= true;
778 }
779
780 fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool {
781 let mut removed = false;
782 self.webhooks.retain(|(n, _)| {
783 if n != name {
784 true
785 } else {
786 removed = true;
787 false
788 }
789 });
790 self.needs_persist |= true;
791 removed
792 }
793
794 fn reset_notification_cooldown(&mut self) {
795 for (_, h) in self.webhooks.iter_mut() {
796 h.last_notification_sent = None;
797 }
798 self.needs_persist |= true;
799 }
800
801 fn prune_stale_webhooks(&mut self, now: LSPSDateTime) {
803 self.webhooks.retain(|(_, webhook)| {
804 let should_prune = now.duration_since(&webhook.last_used) >= MIN_WEBHOOK_RETENTION_DAYS;
805 if should_prune {
806 self.needs_persist |= true;
807 }
808 !should_prune
809 });
810 }
811
812 fn is_prunable(&self) -> bool {
813 self.webhooks.is_empty()
814 }
815}
816
817impl Default for PeerState {
818 fn default() -> Self {
819 let webhooks = Vec::new();
820 let needs_persist = true;
821 Self { webhooks, needs_persist }
822 }
823}
824
825impl_writeable_tlv_based!(PeerState, {
826 (0, webhooks, required_vec),
827 (_unused, needs_persist, (static_value, false)),
828});