lightning_liquidity/lsps5/
service.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! Service implementation for LSPS5 webhook registration.
11
12use crate::alloc::string::ToString;
13use crate::events::EventQueue;
14use crate::lsps0::ser::{LSPSDateTime, LSPSProtocolMessageHandler, LSPSRequestId};
15use crate::lsps5::msgs::{
16	ListWebhooksRequest, ListWebhooksResponse, RemoveWebhookRequest, RemoveWebhookResponse,
17	SetWebhookRequest, SetWebhookResponse, WebhookNotification, WebhookNotificationMethod,
18};
19use crate::message_queue::MessageQueue;
20use crate::persist::{
21	LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
22};
23use crate::prelude::hash_map::Entry;
24use crate::prelude::*;
25use crate::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
26use crate::utils::time::TimeProvider;
27
28use bitcoin::secp256k1::PublicKey;
29
30use lightning::impl_writeable_tlv_based;
31use lightning::ln::channelmanager::AChannelManager;
32use lightning::ln::msgs::{ErrorAction, LightningError};
33use lightning::sign::NodeSigner;
34use lightning::util::logger::Level;
35use lightning::util::persist::KVStore;
36use lightning::util::ser::Writeable;
37
38use core::ops::Deref;
39use core::sync::atomic::{AtomicUsize, Ordering};
40use core::time::Duration;
41
42use alloc::string::String;
43use alloc::vec::Vec;
44
45use super::event::LSPS5ServiceEvent;
46use super::msgs::{
47	LSPS5AppName, LSPS5Message, LSPS5ProtocolError, LSPS5Request, LSPS5Response, LSPS5WebhookUrl,
48};
49
50/// Minimum number of days to retain webhooks after a client's last channel is closed.
51pub const MIN_WEBHOOK_RETENTION_DAYS: Duration = Duration::from_secs(30 * 24 * 60 * 60);
52/// Interval for pruning stale webhooks.
53pub const PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS: Duration = Duration::from_secs(24 * 60 * 60);
54
55/// A stored webhook.
56#[derive(Debug, Clone)]
57struct Webhook {
58	_app_name: LSPS5AppName,
59	url: LSPS5WebhookUrl,
60	_counterparty_node_id: PublicKey,
61	// Timestamp used for tracking when the webhook was created / updated, or when the last notification was sent.
62	// This is used to determine if the webhook is stale and should be pruned.
63	last_used: LSPSDateTime,
64	// Timestamp when we last sent a notification to the client. This is used to enforce
65	// notification cooldowns.
66	last_notification_sent: Option<LSPSDateTime>,
67}
68
69impl_writeable_tlv_based!(Webhook, {
70	(0, _app_name, required),
71	(2, url, required),
72	(4, _counterparty_node_id, required),
73	(6, last_used, required),
74	(8, last_notification_sent, option),
75});
76
77/// Server-side configuration options for LSPS5 Webhook Registration.
78#[derive(Clone, Debug)]
79pub struct LSPS5ServiceConfig {
80	/// Maximum number of webhooks allowed per client.
81	pub max_webhooks_per_client: u32,
82}
83
84/// Default maximum number of webhooks allowed per client.
85pub const DEFAULT_MAX_WEBHOOKS_PER_CLIENT: u32 = 10;
86/// Default notification cooldown time in minutes.
87pub const NOTIFICATION_COOLDOWN_TIME: Duration = Duration::from_secs(60); // 1 minute
88
89// Default configuration for LSPS5 service.
90impl Default for LSPS5ServiceConfig {
91	fn default() -> Self {
92		Self { max_webhooks_per_client: DEFAULT_MAX_WEBHOOKS_PER_CLIENT }
93	}
94}
95
96/// Service-side handler for the [`bLIP-55 / LSPS5`] webhook registration protocol.
97///
98/// Runs on the LSP (server) side. Stores and manages client-registered webhooks,
99/// enforces per-client limits and retention policies, and emits signed JSON-RPC
100/// notifications to each webhook endpoint when events occur.
101///
102/// # Core Responsibilities
103/// - Handle incoming JSON-RPC requests:
104///   - `lsps5.set_webhook` -> insert or replace a webhook, enforce [`max_webhooks_per_client`],
105/// and send an initial [`lsps5.webhook_registered`] notification if new or changed.
106///   - `lsps5.list_webhooks` -> return all registered [`app_name`]s via response.
107///   - `lsps5.remove_webhook` -> delete a named webhook or return [`app_name_not_found`] error.
108/// - Prune stale webhooks after a client has no open channels and no activity for at least
109/// [`MIN_WEBHOOK_RETENTION_DAYS`].
110/// - Sign and enqueue outgoing webhook notifications:
111///   - Construct JSON-RPC 2.0 Notification objects [`WebhookNotification`],
112///   - Timestamp and LN-style zbase32-sign each payload,
113///   - Emit [`LSPS5ServiceEvent::SendWebhookNotification`] with HTTP headers.
114///
115/// # Security & Spec Compliance
116/// - All notifications are signed with the LSP's node key according to bLIP-50/LSPS0.
117/// - Clients must validate signature, timestamp (±10 min), and replay protection via
118///   `LSPS5ClientHandler::parse_webhook_notification`.
119/// - Webhook endpoints use only HTTPS and must guard against unauthorized calls.
120///
121/// [`bLIP-55 / LSPS5`]: https://github.com/lightning/blips/pull/55/files
122/// [`max_webhooks_per_client`]: super::service::LSPS5ServiceConfig::max_webhooks_per_client
123/// [`app_name_not_found`]: super::msgs::LSPS5ProtocolError::AppNameNotFound
124/// [`WebhookNotification`]: super::msgs::WebhookNotification
125/// [`LSPS5ServiceEvent::SendWebhookNotification`]: super::event::LSPS5ServiceEvent::SendWebhookNotification
126/// [`app_name`]: super::msgs::LSPS5AppName
127/// [`lsps5.webhook_registered`]: super::msgs::WebhookNotificationMethod::LSPS5WebhookRegistered
128pub struct LSPS5ServiceHandler<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref>
129where
130	CM::Target: AChannelManager,
131	NS::Target: NodeSigner,
132	K::Target: KVStore,
133	TP::Target: TimeProvider,
134{
135	config: LSPS5ServiceConfig,
136	per_peer_state: RwLock<HashMap<PublicKey, PeerState>>,
137	event_queue: Arc<EventQueue<K>>,
138	pending_messages: Arc<MessageQueue>,
139	time_provider: TP,
140	channel_manager: CM,
141	node_signer: NS,
142	kv_store: K,
143	last_pruning: Mutex<Option<LSPSDateTime>>,
144	persistence_in_flight: AtomicUsize,
145}
146
147impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPS5ServiceHandler<CM, NS, K, TP>
148where
149	CM::Target: AChannelManager,
150	NS::Target: NodeSigner,
151	K::Target: KVStore,
152	TP::Target: TimeProvider,
153{
154	/// Constructs a `LSPS5ServiceHandler` using the given time provider.
155	pub(crate) fn new_with_time_provider(
156		peer_states: HashMap<PublicKey, PeerState>, event_queue: Arc<EventQueue<K>>,
157		pending_messages: Arc<MessageQueue>, channel_manager: CM, kv_store: K, node_signer: NS,
158		config: LSPS5ServiceConfig, time_provider: TP,
159	) -> Self {
160		assert!(config.max_webhooks_per_client > 0, "`max_webhooks_per_client` must be > 0");
161		let per_peer_state = RwLock::new(peer_states);
162		Self {
163			config,
164			per_peer_state,
165			event_queue,
166			pending_messages,
167			time_provider,
168			channel_manager,
169			node_signer,
170			kv_store,
171			last_pruning: Mutex::new(None),
172			persistence_in_flight: AtomicUsize::new(0),
173		}
174	}
175
176	/// Enforces the prior-activity requirement for state-allocating LSPS5 requests (e.g.
177	/// `lsps5.set_webhook`), rejecting and replying with `NoPriorActivityError` if not met.
178	pub(crate) fn enforce_prior_activity_or_reject(
179		&self, client_id: &PublicKey, lsps2_has_active_requests: bool, lsps1_has_activity: bool,
180		request_id: LSPSRequestId,
181	) -> Result<(), LightningError> {
182		let can_accept = self.client_has_open_channel(client_id)
183			|| lsps2_has_active_requests
184			|| lsps1_has_activity;
185
186		let mut message_queue_notifier = self.pending_messages.notifier();
187		if !can_accept {
188			let error = LSPS5ProtocolError::NoPriorActivityError;
189			let msg = LSPS5Message::Response(
190				request_id,
191				LSPS5Response::SetWebhookError(error.clone().into()),
192			)
193			.into();
194			message_queue_notifier.enqueue(&client_id, msg);
195			return Err(LightningError {
196				err: error.message().into(),
197				action: ErrorAction::IgnoreAndLog(Level::Info),
198			});
199		} else {
200			Ok(())
201		}
202	}
203
204	async fn persist_peer_state(
205		&self, counterparty_node_id: PublicKey,
206	) -> Result<(), lightning::io::Error> {
207		let fut = {
208			let mut outer_state_lock = self.per_peer_state.write().unwrap();
209			let encoded = match outer_state_lock.get_mut(&counterparty_node_id) {
210				None => {
211					// We dropped the peer state by now.
212					return Ok(());
213				},
214				Some(entry) => {
215					if !entry.needs_persist {
216						// We already have persisted otherwise by now.
217						return Ok(());
218					} else {
219						entry.needs_persist = false;
220						entry.encode()
221					}
222				},
223			};
224
225			let key = counterparty_node_id.to_string();
226
227			// Begin the write with the `per_peer_state` write lock held to avoid racing with
228			// potentially-in-flight `persist` calls writing state for the same peer.
229			self.kv_store.write(
230				LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
231				LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
232				&key,
233				encoded,
234			)
235		};
236
237		fut.await.map_err(|e| {
238			self.per_peer_state
239				.write()
240				.unwrap()
241				.get_mut(&counterparty_node_id)
242				.map(|p| p.needs_persist = true);
243			e
244		})
245	}
246
247	pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
248		// TODO: We should eventually persist in parallel, however, when we do, we probably want to
249		// introduce some batching to upper-bound the number of requests inflight at any given
250		// time.
251
252		if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
253			// If we're not the first event processor to get here, just return early, the increment
254			// we just did will be treated as "go around again" at the end.
255			return Ok(());
256		}
257
258		loop {
259			let mut need_remove = Vec::new();
260			let mut need_persist = Vec::new();
261
262			self.check_prune_stale_webhooks(&mut self.per_peer_state.write().unwrap());
263			{
264				let outer_state_lock = self.per_peer_state.read().unwrap();
265
266				for (client_id, peer_state) in outer_state_lock.iter() {
267					let is_prunable = peer_state.is_prunable();
268					let has_open_channel = self.client_has_open_channel(client_id);
269					if is_prunable && !has_open_channel {
270						need_remove.push(*client_id);
271					} else if peer_state.needs_persist {
272						need_persist.push(*client_id);
273					}
274				}
275			}
276
277			for client_id in need_persist.into_iter() {
278				debug_assert!(!need_remove.contains(&client_id));
279				self.persist_peer_state(client_id).await?;
280			}
281
282			for client_id in need_remove {
283				let mut future_opt = None;
284				{
285					// We need to take the `per_peer_state` write lock to remove an entry, but also
286					// have to hold it until after the `remove` call returns (but not through
287					// future completion) to ensure that writes for the peer's state are
288					// well-ordered with other `persist_peer_state` calls even across the removal
289					// itself.
290					let mut per_peer_state = self.per_peer_state.write().unwrap();
291					if let Entry::Occupied(mut entry) = per_peer_state.entry(client_id) {
292						let state = entry.get_mut();
293						if state.is_prunable() && !self.client_has_open_channel(&client_id) {
294							entry.remove();
295							let key = client_id.to_string();
296							future_opt = Some(self.kv_store.remove(
297								LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
298								LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
299								&key,
300							));
301						} else {
302							// If the peer was re-added, force a re-persist of the current state.
303							state.needs_persist = true;
304						}
305					} else {
306						// This should never happen, we can only have one `persist` call
307						// in-progress at once and map entries are only removed by it.
308						debug_assert!(false);
309					}
310				}
311				if let Some(future) = future_opt {
312					future.await?;
313				} else {
314					self.persist_peer_state(client_id).await?;
315				}
316			}
317
318			if self.persistence_in_flight.fetch_sub(1, Ordering::AcqRel) != 1 {
319				// If another thread incremented the state while we were running we should go
320				// around again, but only once.
321				self.persistence_in_flight.store(1, Ordering::Release);
322				continue;
323			}
324			break;
325		}
326
327		Ok(())
328	}
329
330	fn check_prune_stale_webhooks<'a>(
331		&self, outer_state_lock: &mut RwLockWriteGuard<'a, HashMap<PublicKey, PeerState>>,
332	) {
333		let mut last_pruning = self.last_pruning.lock().unwrap();
334		let now =
335			LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
336
337		let should_prune = last_pruning.as_ref().map_or(true, |last_time| {
338			now.duration_since(&last_time) > PRUNE_STALE_WEBHOOKS_INTERVAL_DAYS
339		});
340
341		if should_prune {
342			for (_, peer_state) in outer_state_lock.iter_mut() {
343				// Prune stale webhooks, but leave removal of the peers states to the prune logic
344				// in `persist` which will remove it from the store.
345				peer_state.prune_stale_webhooks(now)
346			}
347			*last_pruning = Some(now);
348		}
349	}
350
351	fn handle_set_webhook(
352		&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
353		params: SetWebhookRequest,
354	) -> Result<(), LightningError> {
355		let mut message_queue_notifier = self.pending_messages.notifier();
356
357		let mut outer_state_lock = self.per_peer_state.write().unwrap();
358
359		let peer_state =
360			outer_state_lock.entry(counterparty_node_id).or_insert_with(PeerState::default);
361
362		let now =
363			LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
364
365		let num_webhooks = peer_state.webhooks_len();
366		let mut no_change = false;
367
368		if let Some(webhook) = peer_state.webhook_mut(&params.app_name) {
369			no_change = webhook.url == params.webhook;
370			if !no_change {
371				// The URL was updated.
372				webhook.url = params.webhook.clone();
373				webhook.last_used = now;
374				webhook.last_notification_sent = None;
375				peer_state.needs_persist |= true;
376			}
377		} else {
378			if num_webhooks >= self.config.max_webhooks_per_client as usize {
379				let error = LSPS5ProtocolError::TooManyWebhooks;
380				let msg = LSPS5Message::Response(
381					request_id,
382					LSPS5Response::SetWebhookError(error.clone().into()),
383				)
384				.into();
385				message_queue_notifier.enqueue(&counterparty_node_id, msg);
386				return Err(LightningError {
387					err: error.message().into(),
388					action: ErrorAction::IgnoreAndLog(Level::Info),
389				});
390			}
391
392			let webhook = Webhook {
393				_app_name: params.app_name.clone(),
394				url: params.webhook.clone(),
395				_counterparty_node_id: counterparty_node_id,
396				last_used: now,
397				last_notification_sent: None,
398			};
399
400			peer_state.insert_webhook(params.app_name.clone(), webhook);
401		}
402
403		if !no_change {
404			self.send_webhook_registered_notification(
405				counterparty_node_id,
406				params.app_name.clone(),
407				params.webhook,
408			)
409			.map_err(|e| {
410				let msg = LSPS5Message::Response(
411					request_id.clone(),
412					LSPS5Response::SetWebhookError(e.clone().into()),
413				)
414				.into();
415				message_queue_notifier.enqueue(&counterparty_node_id, msg);
416				LightningError {
417					err: e.message().into(),
418					action: ErrorAction::IgnoreAndLog(Level::Info),
419				}
420			})?;
421		}
422
423		let msg = LSPS5Message::Response(
424			request_id,
425			LSPS5Response::SetWebhook(SetWebhookResponse {
426				num_webhooks: peer_state.webhooks_len() as u32,
427				max_webhooks: self.config.max_webhooks_per_client,
428				no_change,
429			}),
430		)
431		.into();
432		message_queue_notifier.enqueue(&counterparty_node_id, msg);
433		Ok(())
434	}
435
436	fn handle_list_webhooks(
437		&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
438		_params: ListWebhooksRequest,
439	) -> Result<(), LightningError> {
440		let mut message_queue_notifier = self.pending_messages.notifier();
441
442		let outer_state_lock = self.per_peer_state.read().unwrap();
443		let app_names =
444			outer_state_lock.get(&counterparty_node_id).map(|p| p.app_names()).unwrap_or_default();
445
446		let max_webhooks = self.config.max_webhooks_per_client;
447
448		let response = ListWebhooksResponse { app_names, max_webhooks };
449		let msg = LSPS5Message::Response(request_id, LSPS5Response::ListWebhooks(response)).into();
450		message_queue_notifier.enqueue(&counterparty_node_id, msg);
451
452		Ok(())
453	}
454
455	fn handle_remove_webhook(
456		&self, counterparty_node_id: PublicKey, request_id: LSPSRequestId,
457		params: RemoveWebhookRequest,
458	) -> Result<(), LightningError> {
459		let mut message_queue_notifier = self.pending_messages.notifier();
460
461		let mut outer_state_lock = self.per_peer_state.write().unwrap();
462
463		if let Some(peer_state) = outer_state_lock.get_mut(&counterparty_node_id) {
464			if peer_state.remove_webhook(&params.app_name) {
465				let response = RemoveWebhookResponse {};
466				let msg =
467					LSPS5Message::Response(request_id, LSPS5Response::RemoveWebhook(response))
468						.into();
469				message_queue_notifier.enqueue(&counterparty_node_id, msg);
470
471				return Ok(());
472			}
473		}
474
475		let error = LSPS5ProtocolError::AppNameNotFound;
476		let msg = LSPS5Message::Response(
477			request_id,
478			LSPS5Response::RemoveWebhookError(error.clone().into()),
479		)
480		.into();
481
482		message_queue_notifier.enqueue(&counterparty_node_id, msg);
483		return Err(LightningError {
484			err: error.message().into(),
485			action: ErrorAction::IgnoreAndLog(Level::Info),
486		});
487	}
488
489	fn send_webhook_registered_notification(
490		&self, client_node_id: PublicKey, app_name: LSPS5AppName, url: LSPS5WebhookUrl,
491	) -> Result<(), LSPS5ProtocolError> {
492		let notification = WebhookNotification::webhook_registered();
493		self.send_notification(client_node_id, app_name, url, notification)
494	}
495
496	/// Notify the LSP service that the client has one or more incoming payments pending.
497	///
498	/// SHOULD be called by your LSP application logic as soon as you detect an incoming
499	/// payment (HTLC or future mechanism) for `client_id`.
500	/// This builds a [`WebhookNotificationMethod::LSPS5PaymentIncoming`] webhook notification, signs it with your
501	/// node key, and enqueues HTTP POSTs to all registered webhook URLs for that client.
502	///
503	/// This may fail if a similar notification was sent too recently,
504	/// violating the notification cooldown period defined in [`NOTIFICATION_COOLDOWN_TIME`].
505	///
506	/// # Parameters
507	/// - `client_id`: the client's node-ID whose webhooks should be invoked.
508	///
509	/// [`WebhookNotificationMethod::LSPS5PaymentIncoming`]: super::msgs::WebhookNotificationMethod::LSPS5PaymentIncoming
510	/// [`NOTIFICATION_COOLDOWN_TIME`]: super::service::NOTIFICATION_COOLDOWN_TIME
511	pub fn notify_payment_incoming(&self, client_id: PublicKey) -> Result<(), LSPS5ProtocolError> {
512		let notification = WebhookNotification::payment_incoming();
513		self.send_notifications_to_client_webhooks(client_id, notification)
514	}
515
516	/// Notify that an HTLC or other time-bound contract is expiring soon.
517	///
518	/// SHOULD be called by your LSP application logic when a channel contract for `client_id`
519	/// is within 24 blocks of timeout, and the timeout would cause a channel closure.
520	/// Builds a [`WebhookNotificationMethod::LSPS5ExpirySoon`] notification including
521	/// the `timeout` block height, signs it, and enqueues HTTP POSTs to the client's
522	/// registered webhooks.
523	///
524	/// This may fail if a similar notification was sent too recently,
525	/// violating the notification cooldown period defined in [`NOTIFICATION_COOLDOWN_TIME`].
526	///
527	/// # Parameters
528	/// - `client_id`: the client's node-ID whose webhooks should be invoked.
529	/// - `timeout`: the block height at which the channel contract will expire.
530	///
531	/// [`WebhookNotificationMethod::LSPS5ExpirySoon`]: super::msgs::WebhookNotificationMethod::LSPS5ExpirySoon
532	/// [`NOTIFICATION_COOLDOWN_TIME`]: super::service::NOTIFICATION_COOLDOWN_TIME
533	pub fn notify_expiry_soon(
534		&self, client_id: PublicKey, timeout: u32,
535	) -> Result<(), LSPS5ProtocolError> {
536		let notification = WebhookNotification::expiry_soon(timeout);
537		self.send_notifications_to_client_webhooks(client_id, notification)
538	}
539
540	/// Notify that the LSP intends to manage liquidity (e.g. close or splice) on client channels.
541	///
542	/// SHOULD be called by your LSP application logic when you decide to reclaim or adjust
543	/// liquidity for `client_id`. Builds a [`WebhookNotificationMethod::LSPS5LiquidityManagementRequest`] notification,
544	/// signs it, and sends it to all of the client's registered webhook URLs.
545	///
546	/// This may fail if a similar notification was sent too recently,
547	/// violating the notification cooldown period defined in [`NOTIFICATION_COOLDOWN_TIME`].
548	///
549	/// # Parameters
550	/// - `client_id`: the client's node-ID whose webhooks should be invoked.
551	///
552	/// [`WebhookNotificationMethod::LSPS5LiquidityManagementRequest`]: super::msgs::WebhookNotificationMethod::LSPS5LiquidityManagementRequest
553	/// [`NOTIFICATION_COOLDOWN_TIME`]: super::service::NOTIFICATION_COOLDOWN_TIME
554	pub fn notify_liquidity_management_request(
555		&self, client_id: PublicKey,
556	) -> Result<(), LSPS5ProtocolError> {
557		let notification = WebhookNotification::liquidity_management_request();
558		self.send_notifications_to_client_webhooks(client_id, notification)
559	}
560
561	/// Notify that the client has one or more pending BOLT Onion Messages.
562	///
563	/// SHOULD be called by your LSP application logic when you receive Onion Messages
564	/// for `client_id` while the client is offline. Builds a [`WebhookNotificationMethod::LSPS5OnionMessageIncoming`]
565	/// notification, signs it, and enqueues HTTP POSTs to each registered webhook.
566	///
567	/// This may fail if a similar notification was sent too recently,
568	/// violating the notification cooldown period defined in [`NOTIFICATION_COOLDOWN_TIME`].
569	///
570	/// # Parameters
571	/// - `client_id`: the client's node-ID whose webhooks should be invoked.
572	///
573	/// [`WebhookNotificationMethod::LSPS5OnionMessageIncoming`]: super::msgs::WebhookNotificationMethod::LSPS5OnionMessageIncoming
574	/// [`NOTIFICATION_COOLDOWN_TIME`]: super::service::NOTIFICATION_COOLDOWN_TIME
575	pub fn notify_onion_message_incoming(
576		&self, client_id: PublicKey,
577	) -> Result<(), LSPS5ProtocolError> {
578		let notification = WebhookNotification::onion_message_incoming();
579		self.send_notifications_to_client_webhooks(client_id, notification)
580	}
581
582	fn send_notifications_to_client_webhooks(
583		&self, client_id: PublicKey, notification: WebhookNotification,
584	) -> Result<(), LSPS5ProtocolError> {
585		let mut outer_state_lock = self.per_peer_state.write().unwrap();
586		let peer_state = if let Some(peer_state) = outer_state_lock.get_mut(&client_id) {
587			peer_state
588		} else {
589			return Ok(());
590		};
591
592		let now =
593			LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
594
595		// We must avoid sending multiple notifications of the same method
596		// (other than lsps5.webhook_registered) close in time.
597		if notification.method != WebhookNotificationMethod::LSPS5WebhookRegistered {
598			let rate_limit_applies = peer_state.webhooks().iter().any(|(_, webhook)| {
599				webhook.last_notification_sent.as_ref().map_or(false, |last_sent| {
600					now.duration_since(&last_sent) < NOTIFICATION_COOLDOWN_TIME
601				})
602			});
603
604			if rate_limit_applies {
605				return Err(LSPS5ProtocolError::SlowDownError);
606			}
607		}
608
609		for (app_name, webhook) in peer_state.webhooks_mut().iter_mut() {
610			self.send_notification(
611				client_id,
612				app_name.clone(),
613				webhook.url.clone(),
614				notification.clone(),
615			)?;
616			webhook.last_used = now;
617			webhook.last_notification_sent = Some(now);
618		}
619		Ok(())
620	}
621
622	fn send_notification(
623		&self, counterparty_node_id: PublicKey, app_name: LSPS5AppName, url: LSPS5WebhookUrl,
624		notification: WebhookNotification,
625	) -> Result<(), LSPS5ProtocolError> {
626		let event_queue_notifier = self.event_queue.notifier();
627		let timestamp =
628			LSPSDateTime::new_from_duration_since_epoch(self.time_provider.duration_since_epoch());
629
630		let signature_hex = self.sign_notification(&notification, &timestamp)?;
631
632		let mut headers: HashMap<String, String> = [("Content-Type", "application/json")]
633			.into_iter()
634			.map(|(k, v)| (k.to_string(), v.to_string()))
635			.collect();
636		headers.insert("x-lsps5-timestamp".into(), timestamp.to_rfc3339());
637		headers.insert("x-lsps5-signature".into(), signature_hex);
638
639		event_queue_notifier.enqueue(LSPS5ServiceEvent::SendWebhookNotification {
640			counterparty_node_id,
641			app_name,
642			url,
643			notification,
644			headers,
645		});
646
647		Ok(())
648	}
649
650	fn sign_notification(
651		&self, body: &WebhookNotification, timestamp: &LSPSDateTime,
652	) -> Result<String, LSPS5ProtocolError> {
653		let notification_json =
654			serde_json::to_string(body).map_err(|_| LSPS5ProtocolError::SerializationError)?;
655
656		let message = format!(
657			"LSPS5: DO NOT SIGN THIS MESSAGE MANUALLY: LSP: At {} I notify {}",
658			timestamp.to_rfc3339(),
659			notification_json
660		);
661
662		self.node_signer
663			.sign_message(message.as_bytes())
664			.map_err(|_| LSPS5ProtocolError::UnknownError)
665	}
666
667	fn client_has_open_channel(&self, client_id: &PublicKey) -> bool {
668		self.channel_manager
669			.get_cm()
670			.list_channels()
671			.iter()
672			.any(|c| c.is_usable && c.counterparty.node_id == *client_id)
673	}
674
675	pub(crate) fn peer_connected(&self, counterparty_node_id: &PublicKey) {
676		let mut outer_state_lock = self.per_peer_state.write().unwrap();
677		if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
678			peer_state.reset_notification_cooldown();
679		}
680		self.check_prune_stale_webhooks(&mut outer_state_lock);
681	}
682
683	pub(crate) fn peer_disconnected(&self, counterparty_node_id: &PublicKey) {
684		let mut outer_state_lock = self.per_peer_state.write().unwrap();
685		if let Some(peer_state) = outer_state_lock.get_mut(counterparty_node_id) {
686			peer_state.reset_notification_cooldown();
687		}
688		self.check_prune_stale_webhooks(&mut outer_state_lock);
689	}
690}
691
692impl<CM: Deref, NS: Deref, K: Deref + Clone, TP: Deref> LSPSProtocolMessageHandler
693	for LSPS5ServiceHandler<CM, NS, K, TP>
694where
695	CM::Target: AChannelManager,
696	NS::Target: NodeSigner,
697	K::Target: KVStore,
698	TP::Target: TimeProvider,
699{
700	type ProtocolMessage = LSPS5Message;
701	const PROTOCOL_NUMBER: Option<u16> = Some(5);
702
703	fn handle_message(
704		&self, message: Self::ProtocolMessage, counterparty_node_id: &PublicKey,
705	) -> Result<(), LightningError> {
706		match message {
707			LSPS5Message::Request(request_id, request) => {
708				let res = match request {
709					LSPS5Request::SetWebhook(params) => {
710						self.handle_set_webhook(*counterparty_node_id, request_id, params)
711					},
712					LSPS5Request::ListWebhooks(params) => {
713						self.handle_list_webhooks(*counterparty_node_id, request_id, params)
714					},
715					LSPS5Request::RemoveWebhook(params) => {
716						self.handle_remove_webhook(*counterparty_node_id, request_id, params)
717					},
718				};
719				res
720			},
721			_ => {
722				debug_assert!(
723					false,
724					"Service handler received LSPS5 response message. This should never happen."
725				);
726				let err = format!(
727					"Service handler received LSPS5 response message from node {}. This should never happen.",
728					counterparty_node_id
729				);
730				Err(LightningError { err, action: ErrorAction::IgnoreAndLog(Level::Info) })
731			},
732		}
733	}
734}
735
736#[derive(Debug)]
737pub(crate) struct PeerState {
738	webhooks: Vec<(LSPS5AppName, Webhook)>,
739	needs_persist: bool,
740}
741
742impl PeerState {
743	fn webhook_mut(&mut self, name: &LSPS5AppName) -> Option<&mut Webhook> {
744		let res =
745			self.webhooks.iter_mut().find_map(|(n, h)| if n == name { Some(h) } else { None });
746		self.needs_persist |= true;
747		res
748	}
749
750	fn webhooks(&self) -> &Vec<(LSPS5AppName, Webhook)> {
751		&self.webhooks
752	}
753
754	fn webhooks_mut(&mut self) -> &mut Vec<(LSPS5AppName, Webhook)> {
755		let res = &mut self.webhooks;
756		self.needs_persist |= true;
757		res
758	}
759
760	fn webhooks_len(&self) -> usize {
761		self.webhooks.len()
762	}
763
764	fn app_names(&self) -> Vec<LSPS5AppName> {
765		self.webhooks.iter().map(|(n, _)| n).cloned().collect()
766	}
767
768	fn insert_webhook(&mut self, name: LSPS5AppName, hook: Webhook) {
769		for (n, h) in self.webhooks.iter_mut() {
770			if *n == name {
771				*h = hook;
772				return;
773			}
774		}
775
776		self.webhooks.push((name, hook));
777		self.needs_persist |= true;
778	}
779
780	fn remove_webhook(&mut self, name: &LSPS5AppName) -> bool {
781		let mut removed = false;
782		self.webhooks.retain(|(n, _)| {
783			if n != name {
784				true
785			} else {
786				removed = true;
787				false
788			}
789		});
790		self.needs_persist |= true;
791		removed
792	}
793
794	fn reset_notification_cooldown(&mut self) {
795		for (_, h) in self.webhooks.iter_mut() {
796			h.last_notification_sent = None;
797		}
798		self.needs_persist |= true;
799	}
800
801	// Returns whether the entire state is empty and can be pruned.
802	fn prune_stale_webhooks(&mut self, now: LSPSDateTime) {
803		self.webhooks.retain(|(_, webhook)| {
804			let should_prune = now.duration_since(&webhook.last_used) >= MIN_WEBHOOK_RETENTION_DAYS;
805			if should_prune {
806				self.needs_persist |= true;
807			}
808			!should_prune
809		});
810	}
811
812	fn is_prunable(&self) -> bool {
813		self.webhooks.is_empty()
814	}
815}
816
817impl Default for PeerState {
818	fn default() -> Self {
819		let webhooks = Vec::new();
820		let needs_persist = true;
821		Self { webhooks, needs_persist }
822	}
823}
824
825impl_writeable_tlv_based!(PeerState, {
826	(0, webhooks, required_vec),
827	(_unused, needs_persist, (static_value, false)),
828});