1use crate::alloc::string::ToString;
13use crate::events::EventQueue;
14use crate::lsps0::ser::{LSPSMessage, LSPSProtocolMessageHandler, LSPSRequestId};
15use crate::lsps5::event::LSPS5ClientEvent;
16use crate::lsps5::msgs::{
17 LSPS5Message, LSPS5Request, LSPS5Response, ListWebhooksRequest, RemoveWebhookRequest,
18 SetWebhookRequest,
19};
20
21use crate::message_queue::MessageQueue;
22use crate::prelude::{new_hash_map, HashMap};
23use crate::sync::{Arc, Mutex, RwLock};
24use crate::utils::generate_request_id;
25
26use super::msgs::{LSPS5AppName, LSPS5Error, LSPS5WebhookUrl};
27
28use bitcoin::secp256k1::PublicKey;
29
30use lightning::ln::msgs::{ErrorAction, LightningError};
31use lightning::sign::EntropySource;
32use lightning::util::logger::Level;
33
34use alloc::collections::VecDeque;
35use alloc::string::String;
36use lightning::util::persist::KVStore;
37
38use core::ops::Deref;
39
40impl PartialEq<LSPSRequestId> for (LSPSRequestId, (LSPS5AppName, LSPS5WebhookUrl)) {
41 fn eq(&self, other: &LSPSRequestId) -> bool {
42 &self.0 == other
43 }
44}
45
46impl PartialEq<LSPSRequestId> for (LSPSRequestId, LSPS5AppName) {
47 fn eq(&self, other: &LSPSRequestId) -> bool {
48 &self.0 == other
49 }
50}
51
52#[derive(Debug, Clone, Copy, Default)]
53pub struct LSPS5ClientConfig {}
55
56struct PeerState {
57 pending_set_webhook_requests: VecDeque<(LSPSRequestId, (LSPS5AppName, LSPS5WebhookUrl))>,
58 pending_list_webhooks_requests: VecDeque<LSPSRequestId>,
59 pending_remove_webhook_requests: VecDeque<(LSPSRequestId, LSPS5AppName)>,
60}
61
62const MAX_PENDING_REQUESTS: usize = 5;
63
64impl PeerState {
65 fn new() -> Self {
66 Self {
67 pending_set_webhook_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
68 pending_list_webhooks_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
69 pending_remove_webhook_requests: VecDeque::with_capacity(MAX_PENDING_REQUESTS),
70 }
71 }
72
73 fn add_request<T, F>(&mut self, item: T, queue_selector: F)
74 where
75 F: FnOnce(&mut Self) -> &mut VecDeque<T>,
76 {
77 let queue = queue_selector(self);
78 if queue.len() == MAX_PENDING_REQUESTS {
79 queue.pop_front();
80 }
81 queue.push_back(item);
82 }
83
84 fn find_and_remove_request<T, F>(
85 &mut self, queue_selector: F, request_id: &LSPSRequestId,
86 ) -> Option<T>
87 where
88 F: FnOnce(&mut Self) -> &mut VecDeque<T>,
89 T: Clone,
90 for<'a> &'a T: PartialEq<&'a LSPSRequestId>,
91 {
92 let queue = queue_selector(self);
93 if let Some(pos) = queue.iter().position(|item| item == request_id) {
94 queue.remove(pos)
95 } else {
96 None
97 }
98 }
99
100 fn is_empty(&self) -> bool {
101 self.pending_set_webhook_requests.is_empty()
102 && self.pending_list_webhooks_requests.is_empty()
103 && self.pending_remove_webhook_requests.is_empty()
104 }
105}
106
107pub struct LSPS5ClientHandler<ES: Deref, K: Deref + Clone>
129where
130 ES::Target: EntropySource,
131 K::Target: KVStore,
132{
133 pending_messages: Arc<MessageQueue>,
134 pending_events: Arc<EventQueue<K>>,
135 entropy_source: ES,
136 per_peer_state: RwLock<HashMap<PublicKey, Mutex<PeerState>>>,
137 _config: LSPS5ClientConfig,
138}
139
140impl<ES: Deref, K: Deref + Clone> LSPS5ClientHandler<ES, K>
141where
142 ES::Target: EntropySource,
143 K::Target: KVStore,
144{
145 pub(crate) fn new(
147 entropy_source: ES, pending_messages: Arc<MessageQueue>,
148 pending_events: Arc<EventQueue<K>>, _config: LSPS5ClientConfig,
149 ) -> Self {
150 Self {
151 pending_messages,
152 pending_events,
153 entropy_source,
154 per_peer_state: RwLock::new(new_hash_map()),
155 _config,
156 }
157 }
158
159 fn with_peer_state<F, R>(&self, counterparty_node_id: PublicKey, f: F) -> R
160 where
161 F: FnOnce(&mut PeerState) -> R,
162 {
163 let mut outer_state_lock = self.per_peer_state.write().unwrap();
164 let inner_state_lock =
165 outer_state_lock.entry(counterparty_node_id).or_insert(Mutex::new(PeerState::new()));
166 let mut peer_state_lock = inner_state_lock.lock().unwrap();
167
168 f(&mut *peer_state_lock)
169 }
170
171 pub fn set_webhook(
201 &self, counterparty_node_id: PublicKey, app_name: String, webhook_url: String,
202 ) -> Result<LSPSRequestId, LSPS5Error> {
203 let mut message_queue_notifier = self.pending_messages.notifier();
204 let app_name = LSPS5AppName::from_string(app_name)?;
205
206 let lsps_webhook_url = LSPS5WebhookUrl::from_string(webhook_url)?;
207
208 let request_id = generate_request_id(&self.entropy_source);
209
210 self.with_peer_state(counterparty_node_id, |peer_state| {
211 peer_state.add_request(
212 (request_id.clone(), (app_name.clone(), lsps_webhook_url.clone())),
213 |s| &mut s.pending_set_webhook_requests,
214 );
215 });
216
217 let request =
218 LSPS5Request::SetWebhook(SetWebhookRequest { app_name, webhook: lsps_webhook_url });
219
220 let message = LSPS5Message::Request(request_id.clone(), request);
221 message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
222
223 Ok(request_id)
224 }
225
226 pub fn list_webhooks(&self, counterparty_node_id: PublicKey) -> LSPSRequestId {
243 let mut message_queue_notifier = self.pending_messages.notifier();
244 let request_id = generate_request_id(&self.entropy_source);
245
246 self.with_peer_state(counterparty_node_id, |peer_state| {
247 peer_state.add_request(request_id.clone(), |s| &mut s.pending_list_webhooks_requests);
248 });
249
250 let request = LSPS5Request::ListWebhooks(ListWebhooksRequest {});
251 let message = LSPS5Message::Request(request_id.clone(), request);
252 message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
253
254 request_id
255 }
256
257 pub fn remove_webhook(
277 &self, counterparty_node_id: PublicKey, app_name: String,
278 ) -> Result<LSPSRequestId, LSPS5Error> {
279 let mut message_queue_notifier = self.pending_messages.notifier();
280 let app_name = LSPS5AppName::from_string(app_name)?;
281
282 let request_id = generate_request_id(&self.entropy_source);
283
284 self.with_peer_state(counterparty_node_id, |peer_state| {
285 peer_state.add_request((request_id.clone(), app_name.clone()), |s| {
286 &mut s.pending_remove_webhook_requests
287 });
288 });
289
290 let request = LSPS5Request::RemoveWebhook(RemoveWebhookRequest { app_name });
291 let message = LSPS5Message::Request(request_id.clone(), request);
292 message_queue_notifier.enqueue(&counterparty_node_id, LSPSMessage::LSPS5(message));
293
294 Ok(request_id)
295 }
296
297 fn handle_message(
298 &self, message: LSPS5Message, counterparty_node_id: &PublicKey,
299 ) -> Result<(), LightningError> {
300 let (request_id, response) = match message {
301 LSPS5Message::Request(_, _) => {
302 return Err(LightningError {
303 err: format!(
304 "Received unexpected request message from {}",
305 counterparty_node_id
306 ),
307 action: ErrorAction::IgnoreAndLog(Level::Debug),
308 });
309 },
310 LSPS5Message::Response(rid, resp) => (rid, resp),
311 };
312 let mut result: Result<(), LightningError> = Err(LightningError {
313 err: format!("Received LSPS5 response from unknown peer: {}", counterparty_node_id),
314 action: ErrorAction::IgnoreAndLog(Level::Debug),
315 });
316 let event_queue_notifier = self.pending_events.notifier();
317 let handle_response = |peer_state: &mut PeerState| {
318 if let Some((_, (app_name, webhook_url))) = peer_state
319 .find_and_remove_request(|s| &mut s.pending_set_webhook_requests, &request_id)
320 {
321 match &response {
322 LSPS5Response::SetWebhook(r) => {
323 event_queue_notifier.enqueue(LSPS5ClientEvent::WebhookRegistered {
324 counterparty_node_id: *counterparty_node_id,
325 num_webhooks: r.num_webhooks,
326 max_webhooks: r.max_webhooks,
327 no_change: r.no_change,
328 app_name,
329 url: webhook_url,
330 request_id,
331 });
332 result = Ok(());
333 },
334 LSPS5Response::SetWebhookError(e) => {
335 event_queue_notifier.enqueue(LSPS5ClientEvent::WebhookRegistrationFailed {
336 counterparty_node_id: *counterparty_node_id,
337 error: e.clone().into(),
338 app_name,
339 url: webhook_url,
340 request_id,
341 });
342 result = Ok(());
343 },
344 _ => {
345 result = Err(LightningError {
346 err: "Unexpected response type for SetWebhook".to_string(),
347 action: ErrorAction::IgnoreAndLog(Level::Error),
348 });
349 },
350 }
351 } else if let Some(_) = peer_state
352 .find_and_remove_request(|s| &mut s.pending_list_webhooks_requests, &request_id)
353 {
354 match &response {
355 LSPS5Response::ListWebhooks(r) => {
356 event_queue_notifier.enqueue(LSPS5ClientEvent::WebhooksListed {
357 counterparty_node_id: *counterparty_node_id,
358 app_names: r.app_names.clone(),
359 max_webhooks: r.max_webhooks,
360 request_id,
361 });
362 result = Ok(());
363 },
364 _ => {
365 result = Err(LightningError {
366 err: "Unexpected response type for ListWebhooks".to_string(),
367 action: ErrorAction::IgnoreAndLog(Level::Error),
368 });
369 },
370 }
371 } else if let Some((_, app_name)) = peer_state
372 .find_and_remove_request(|s| &mut s.pending_remove_webhook_requests, &request_id)
373 {
374 match &response {
375 LSPS5Response::RemoveWebhook(_) => {
376 event_queue_notifier.enqueue(LSPS5ClientEvent::WebhookRemoved {
377 counterparty_node_id: *counterparty_node_id,
378 app_name,
379 request_id,
380 });
381 result = Ok(());
382 },
383 LSPS5Response::RemoveWebhookError(e) => {
384 event_queue_notifier.enqueue(LSPS5ClientEvent::WebhookRemovalFailed {
385 counterparty_node_id: *counterparty_node_id,
386 error: e.clone().into(),
387 app_name,
388 request_id,
389 });
390 result = Ok(());
391 },
392 _ => {
393 result = Err(LightningError {
394 err: "Unexpected response type for RemoveWebhook".to_string(),
395 action: ErrorAction::IgnoreAndLog(Level::Error),
396 });
397 },
398 }
399 } else {
400 result = Err(LightningError {
401 err: format!("Received response for unknown request ID: {}", request_id.0),
402 action: ErrorAction::IgnoreAndLog(Level::Debug),
403 });
404 }
405 };
406 self.with_peer_state(*counterparty_node_id, handle_response);
407
408 self.check_and_remove_empty_peer_state(counterparty_node_id);
409
410 result
411 }
412
413 fn check_and_remove_empty_peer_state(&self, counterparty_node_id: &PublicKey) {
414 let mut outer_state_lock = self.per_peer_state.write().unwrap();
415 let should_remove =
416 if let Some(peer_state_mutex) = outer_state_lock.get(counterparty_node_id) {
417 let peer_state = peer_state_mutex.lock().unwrap();
418 peer_state.is_empty()
419 } else {
420 false
421 };
422
423 if should_remove {
424 outer_state_lock.remove(counterparty_node_id);
425 }
426 }
427}
428
429impl<ES: Deref, K: Deref + Clone> LSPSProtocolMessageHandler for LSPS5ClientHandler<ES, K>
430where
431 ES::Target: EntropySource,
432 K::Target: KVStore,
433{
434 type ProtocolMessage = LSPS5Message;
435 const PROTOCOL_NUMBER: Option<u16> = Some(5);
436
437 fn handle_message(
438 &self, message: Self::ProtocolMessage, lsp_node_id: &PublicKey,
439 ) -> Result<(), LightningError> {
440 self.handle_message(message, lsp_node_id)
441 }
442}
443
444#[cfg(all(test, feature = "time"))]
445mod tests {
446
447 use super::*;
448 use crate::{lsps0::ser::LSPSRequestId, lsps5::msgs::SetWebhookResponse};
449 use bitcoin::{key::Secp256k1, secp256k1::SecretKey};
450 use core::sync::atomic::{AtomicU64, Ordering};
451 use lightning::util::persist::KVStoreSyncWrapper;
452 use lightning::util::test_utils::TestStore;
453 use lightning::util::wakers::Notifier;
454
455 struct UniqueTestEntropy {
456 counter: AtomicU64,
457 }
458
459 impl EntropySource for UniqueTestEntropy {
460 fn get_secure_random_bytes(&self) -> [u8; 32] {
461 let counter = self.counter.fetch_add(1, Ordering::SeqCst);
462 let mut bytes = [0u8; 32];
463 bytes[0..8].copy_from_slice(&counter.to_be_bytes());
464 bytes
465 }
466 }
467
468 fn setup_test_client() -> (
469 LSPS5ClientHandler<Arc<UniqueTestEntropy>, Arc<KVStoreSyncWrapper<Arc<TestStore>>>>,
470 Arc<MessageQueue>,
471 Arc<EventQueue<Arc<KVStoreSyncWrapper<Arc<TestStore>>>>>,
472 PublicKey,
473 PublicKey,
474 ) {
475 let test_entropy_source = Arc::new(UniqueTestEntropy { counter: AtomicU64::new(2) });
476 let notifier = Arc::new(Notifier::new());
477 let message_queue = Arc::new(MessageQueue::new(notifier));
478
479 let kv_store = Arc::new(KVStoreSyncWrapper(Arc::new(TestStore::new(false))));
480 let persist_notifier = Arc::new(Notifier::new());
481 let event_queue = Arc::new(EventQueue::new(VecDeque::new(), kv_store, persist_notifier));
482 let client = LSPS5ClientHandler::new(
483 test_entropy_source,
484 Arc::clone(&message_queue),
485 Arc::clone(&event_queue),
486 LSPS5ClientConfig::default(),
487 );
488
489 let secp = Secp256k1::new();
490 let secret_key_1 = SecretKey::from_slice(&[42u8; 32]).unwrap();
491 let secret_key_2 = SecretKey::from_slice(&[43u8; 32]).unwrap();
492 let peer_1 = PublicKey::from_secret_key(&secp, &secret_key_1);
493 let peer_2 = PublicKey::from_secret_key(&secp, &secret_key_2);
494
495 (client, message_queue, event_queue, peer_1, peer_2)
496 }
497
498 #[test]
499 fn test_per_peer_state_isolation() {
500 let (client, _, _, peer_1, peer_2) = setup_test_client();
501
502 let req_id_1 = client
503 .set_webhook(peer_1, "test-app-1".to_string(), "https://example.com/hook1".to_string())
504 .unwrap();
505 let req_id_2 = client
506 .set_webhook(peer_2, "test-app-2".to_string(), "https://example.com/hook2".to_string())
507 .unwrap();
508
509 {
510 let outer_state_lock = client.per_peer_state.read().unwrap();
511
512 let peer_1_state = outer_state_lock.get(&peer_1).unwrap().lock().unwrap();
513 assert!(peer_1_state
514 .pending_set_webhook_requests
515 .iter()
516 .any(|(id, _)| id == &req_id_1));
517
518 let peer_2_state = outer_state_lock.get(&peer_2).unwrap().lock().unwrap();
519 assert!(peer_2_state
520 .pending_set_webhook_requests
521 .iter()
522 .any(|(id, _)| id == &req_id_2));
523 }
524 }
525
526 #[test]
527 fn test_pending_request_tracking() {
528 let (client, _, _, peer, _) = setup_test_client();
529 const APP_NAME: &str = "test-app";
530 const WEBHOOK_URL: &str = "https://example.com/hook";
531 let lsps5_app_name = LSPS5AppName::from_string(APP_NAME.to_string()).unwrap();
532 let lsps5_webhook_url = LSPS5WebhookUrl::from_string(WEBHOOK_URL.to_string()).unwrap();
533 let set_req_id =
534 client.set_webhook(peer, APP_NAME.to_string(), WEBHOOK_URL.to_string()).unwrap();
535 let list_req_id = client.list_webhooks(peer);
536 let remove_req_id = client.remove_webhook(peer, "test-app".to_string()).unwrap();
537
538 {
539 let outer_state_lock = client.per_peer_state.read().unwrap();
540 let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
541 let set_request = peer_state
542 .pending_set_webhook_requests
543 .iter()
544 .find(|(id, _)| id == &set_req_id)
545 .unwrap();
546 assert_eq!(&set_request.1, &(lsps5_app_name.clone(), lsps5_webhook_url));
547
548 assert!(peer_state.pending_list_webhooks_requests.contains(&list_req_id));
549
550 let remove_request = peer_state
551 .pending_remove_webhook_requests
552 .iter()
553 .find(|(id, _)| id == &remove_req_id)
554 .unwrap();
555 assert_eq!(&remove_request.1, &lsps5_app_name);
556 }
557 }
558
559 #[test]
560 fn test_unknown_request_id_handling() {
561 let (client, _message_queue, _, peer, _) = setup_test_client();
562
563 let _valid_req = client
564 .set_webhook(peer, "test-app".to_string(), "https://example.com/hook".to_string())
565 .unwrap();
566
567 let unknown_req_id = LSPSRequestId("unknown:request:id".to_string());
568 let response = LSPS5Response::SetWebhook(SetWebhookResponse {
569 num_webhooks: 1,
570 max_webhooks: 5,
571 no_change: false,
572 });
573 let response_msg = LSPS5Message::Response(unknown_req_id, response);
574
575 let result = client.handle_message(response_msg, &peer);
576 assert!(result.is_err());
577 let error = result.unwrap_err();
578 assert!(error.err.to_lowercase().contains("unknown request id"));
579 }
580
581 #[test]
582 fn test_pending_request_eviction() {
583 let (client, _, _, peer, _) = setup_test_client();
584
585 let mut request_ids = Vec::new();
586 for i in 0..MAX_PENDING_REQUESTS {
587 let req_id = client
588 .set_webhook(peer, format!("app-{}", i), format!("https://example.com/hook{}", i))
589 .unwrap();
590 request_ids.push(req_id);
591 }
592
593 {
594 let outer_state_lock = client.per_peer_state.read().unwrap();
595 let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
596 for req_id in &request_ids {
597 assert!(peer_state.pending_set_webhook_requests.iter().any(|(id, _)| id == req_id));
598 }
599 assert_eq!(peer_state.pending_set_webhook_requests.len(), MAX_PENDING_REQUESTS);
600 }
601
602 let new_req_id = client
603 .set_webhook(peer, "app-new".to_string(), "https://example.com/hook-new".to_string())
604 .unwrap();
605
606 {
607 let outer_state_lock = client.per_peer_state.read().unwrap();
608 let peer_state = outer_state_lock.get(&peer).unwrap().lock().unwrap();
609 assert_eq!(peer_state.pending_set_webhook_requests.len(), MAX_PENDING_REQUESTS);
610
611 assert!(!peer_state
612 .pending_set_webhook_requests
613 .iter()
614 .any(|(id, _)| id == &request_ids[0]));
615
616 for req_id in &request_ids[1..] {
617 assert!(peer_state.pending_set_webhook_requests.iter().any(|(id, _)| id == req_id));
618 }
619
620 assert!(peer_state
621 .pending_set_webhook_requests
622 .iter()
623 .any(|(id, _)| id == &new_req_id));
624 }
625 }
626
627 #[test]
628 fn test_peer_state_cleanup_and_recreation() {
629 let (client, _, _, peer, _) = setup_test_client();
630
631 let set_webhook_req_id = client
632 .set_webhook(peer, "test-app".to_string(), "https://example.com/hook".to_string())
633 .unwrap();
634
635 let list_webhooks_req_id = client.list_webhooks(peer);
636
637 {
638 let state = client.per_peer_state.read().unwrap();
639 assert!(state.contains_key(&peer));
640 let peer_state = state.get(&peer).unwrap().lock().unwrap();
641 assert!(peer_state
642 .pending_set_webhook_requests
643 .iter()
644 .any(|(id, _)| id == &set_webhook_req_id));
645 assert!(peer_state.pending_list_webhooks_requests.contains(&list_webhooks_req_id));
646 }
647
648 let set_webhook_response = LSPS5Response::SetWebhook(SetWebhookResponse {
649 num_webhooks: 1,
650 max_webhooks: 5,
651 no_change: false,
652 });
653 let response_msg = LSPS5Message::Response(set_webhook_req_id.clone(), set_webhook_response);
654 client.handle_message(response_msg, &peer).unwrap();
657
658 {
659 let state = client.per_peer_state.read().unwrap();
660 assert!(state.contains_key(&peer));
661 let peer_state = state.get(&peer).unwrap().lock().unwrap();
662 assert!(!peer_state
663 .pending_set_webhook_requests
664 .iter()
665 .any(|(id, _)| id == &set_webhook_req_id));
666 assert!(peer_state.pending_list_webhooks_requests.contains(&list_webhooks_req_id));
667 }
668
669 let list_webhooks_response =
670 LSPS5Response::ListWebhooks(crate::lsps5::msgs::ListWebhooksResponse {
671 app_names: vec![],
672 max_webhooks: 5,
673 });
674 let response_msg = LSPS5Message::Response(list_webhooks_req_id, list_webhooks_response);
675
676 client.handle_message(response_msg, &peer).unwrap();
678
679 {
680 let state = client.per_peer_state.read().unwrap();
681 assert!(!state.contains_key(&peer));
682 }
683
684 let new_req_id = client
686 .set_webhook(peer, "test-app-2".to_string(), "https://example.com/hook2".to_string())
687 .unwrap();
688
689 {
690 let state = client.per_peer_state.read().unwrap();
691 assert!(state.contains_key(&peer));
692 let peer_state = state.get(&peer).unwrap().lock().unwrap();
693 assert!(peer_state
694 .pending_set_webhook_requests
695 .iter()
696 .any(|(id, _)| id == &new_req_id));
697 }
698 }
699}