1use bitcoin::hashes::{Hash, HashEngine};
14use bitcoin::hashes::hmac::{Hmac, HmacEngine};
15use bitcoin::hashes::sha256::Hash as Sha256;
16use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
17
18use crate::blinded_path::{IntroductionNode, NodeIdLookUp};
19use crate::blinded_path::message::{BlindedMessagePath, MessageForwardNode, ForwardTlvs, MessageContext, NextMessageHop, ReceiveTlvs};
20use crate::blinded_path::utils;
21use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
22use crate::sign::{EntropySource, NodeSigner, Recipient};
23use crate::types::features::{InitFeatures, NodeFeatures};
24use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
25use crate::ln::onion_utils;
26use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph};
27use super::async_payments::AsyncPaymentsMessageHandler;
28#[cfg(async_payments)]
29use super::async_payments::AsyncPaymentsMessage;
30use super::dns_resolution::{DNSResolverMessageHandler, DNSResolverMessage};
31use super::packet::OnionMessageContents;
32use super::packet::ParsedOnionMessageContents;
33use super::offers::OffersMessageHandler;
34use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
35use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture};
36use crate::util::logger::{Logger, WithContext};
37use crate::util::ser::Writeable;
38use crate::util::wakers::{Future, Notifier};
39
40use core::fmt;
41use core::ops::Deref;
42use core::sync::atomic::{AtomicBool, Ordering};
43use crate::io;
44use crate::sync::Mutex;
45use crate::prelude::*;
46
47#[cfg(not(c_bindings))]
48use {
49 crate::sign::KeysManager,
50 crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager},
51 crate::ln::peer_handler::IgnoringMessageHandler,
52 crate::sync::Arc,
53};
54
55pub(super) const MAX_TIMER_TICKS: usize = 2;
56
57pub trait AOnionMessenger {
62 type EntropySource: EntropySource + ?Sized;
64 type ES: Deref<Target = Self::EntropySource>;
66 type NodeSigner: NodeSigner + ?Sized;
68 type NS: Deref<Target = Self::NodeSigner>;
70 type Logger: Logger + ?Sized;
72 type L: Deref<Target = Self::Logger>;
74 type NodeIdLookUp: NodeIdLookUp + ?Sized;
76 type NL: Deref<Target = Self::NodeIdLookUp>;
78 type MessageRouter: MessageRouter + ?Sized;
80 type MR: Deref<Target = Self::MessageRouter>;
82 type OffersMessageHandler: OffersMessageHandler + ?Sized;
84 type OMH: Deref<Target = Self::OffersMessageHandler>;
86 type AsyncPaymentsMessageHandler: AsyncPaymentsMessageHandler + ?Sized;
88 type APH: Deref<Target = Self::AsyncPaymentsMessageHandler>;
90 type DNSResolverMessageHandler: DNSResolverMessageHandler + ?Sized;
92 type DRH: Deref<Target = Self::DNSResolverMessageHandler>;
94 type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
96 type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
98 fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::APH, Self::DRH, Self::CMH>;
100}
101
102impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, DRH: Deref, CMH: Deref> AOnionMessenger
103for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH> where
104 ES::Target: EntropySource,
105 NS::Target: NodeSigner,
106 L::Target: Logger,
107 NL::Target: NodeIdLookUp,
108 MR::Target: MessageRouter,
109 OMH::Target: OffersMessageHandler,
110 APH:: Target: AsyncPaymentsMessageHandler,
111 DRH::Target: DNSResolverMessageHandler,
112 CMH::Target: CustomOnionMessageHandler,
113{
114 type EntropySource = ES::Target;
115 type ES = ES;
116 type NodeSigner = NS::Target;
117 type NS = NS;
118 type Logger = L::Target;
119 type L = L;
120 type NodeIdLookUp = NL::Target;
121 type NL = NL;
122 type MessageRouter = MR::Target;
123 type MR = MR;
124 type OffersMessageHandler = OMH::Target;
125 type OMH = OMH;
126 type AsyncPaymentsMessageHandler = APH::Target;
127 type APH = APH;
128 type DNSResolverMessageHandler = DRH::Target;
129 type DRH = DRH;
130 type CustomOnionMessageHandler = CMH::Target;
131 type CMH = CMH;
132 fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH> { self }
133}
134
135pub struct OnionMessenger<
254 ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, DRH: Deref, CMH: Deref
255> where
256 ES::Target: EntropySource,
257 NS::Target: NodeSigner,
258 L::Target: Logger,
259 NL::Target: NodeIdLookUp,
260 MR::Target: MessageRouter,
261 OMH::Target: OffersMessageHandler,
262 APH::Target: AsyncPaymentsMessageHandler,
263 DRH::Target: DNSResolverMessageHandler,
264 CMH::Target: CustomOnionMessageHandler,
265{
266 entropy_source: ES,
267 node_signer: NS,
268 logger: L,
269 message_recipients: Mutex<HashMap<PublicKey, OnionMessageRecipient>>,
270 secp_ctx: Secp256k1<secp256k1::All>,
271 node_id_lookup: NL,
272 message_router: MR,
273 offers_handler: OMH,
274 #[allow(unused)]
275 async_payments_handler: APH,
276 dns_resolver_handler: DRH,
277 custom_handler: CMH,
278 intercept_messages_for_offline_peers: bool,
279 pending_intercepted_msgs_events: Mutex<Vec<Event>>,
280 pending_peer_connected_events: Mutex<Vec<Event>>,
281 pending_events_processor: AtomicBool,
282 event_notifier: Notifier,
285}
286
287enum OnionMessageRecipient {
289 ConnectedPeer(VecDeque<OnionMessage>),
291
292 PendingConnection(VecDeque<OnionMessage>, Option<Vec<SocketAddress>>, usize),
295}
296
297impl OnionMessageRecipient {
298 fn pending_connection(addresses: Vec<SocketAddress>) -> Self {
299 Self::PendingConnection(VecDeque::new(), Some(addresses), 0)
300 }
301
302 fn pending_messages(&self) -> &VecDeque<OnionMessage> {
303 match self {
304 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
305 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
306 }
307 }
308
309 fn enqueue_message(&mut self, message: OnionMessage) {
310 let pending_messages = match self {
311 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
312 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
313 };
314
315 pending_messages.push_back(message);
316 }
317
318 fn dequeue_message(&mut self) -> Option<OnionMessage> {
319 let pending_messages = match self {
320 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
321 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => {
322 debug_assert!(false);
323 pending_messages
324 },
325 };
326
327 pending_messages.pop_front()
328 }
329
330 #[cfg(test)]
331 fn release_pending_messages(&mut self) -> VecDeque<OnionMessage> {
332 let pending_messages = match self {
333 OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages,
334 OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages,
335 };
336
337 core::mem::take(pending_messages)
338 }
339
340 fn mark_connected(&mut self) {
341 if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self {
342 let mut new_pending_messages = VecDeque::new();
343 core::mem::swap(pending_messages, &mut new_pending_messages);
344 *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages);
345 }
346 }
347
348 fn is_connected(&self) -> bool {
349 match self {
350 OnionMessageRecipient::ConnectedPeer(..) => true,
351 OnionMessageRecipient::PendingConnection(..) => false,
352 }
353 }
354}
355
356
357#[derive(Clone, Debug, Eq, PartialEq)]
360pub struct Responder {
361 reply_path: BlindedMessagePath,
363}
364
365impl_writeable_tlv_based!(Responder, {
366 (0, reply_path, required),
367});
368
369impl Responder {
370 pub(super) fn new(reply_path: BlindedMessagePath) -> Self {
372 Responder {
373 reply_path,
374 }
375 }
376
377 pub fn respond(self) -> ResponseInstruction {
381 ResponseInstruction {
382 destination: Destination::BlindedPath(self.reply_path),
383 context: None,
384 }
385 }
386
387 pub fn respond_with_reply_path(self, context: MessageContext) -> ResponseInstruction {
391 ResponseInstruction {
392 destination: Destination::BlindedPath(self.reply_path),
393 context: Some(context),
394 }
395 }
396}
397
398#[derive(Clone)]
400pub struct ResponseInstruction {
401 destination: Destination,
405 context: Option<MessageContext>,
406}
407
408impl ResponseInstruction {
409 pub fn into_instructions(self) -> MessageSendInstructions {
412 MessageSendInstructions::ForReply { instructions: self }
413 }
414}
415
416#[derive(Clone)]
418pub enum MessageSendInstructions {
419 WithSpecifiedReplyPath {
422 destination: Destination,
424 reply_path: BlindedMessagePath,
426 },
427 WithReplyPath {
430 destination: Destination,
432 context: MessageContext,
435 },
436 WithoutReplyPath {
439 destination: Destination,
441 },
442 ForReply {
444 instructions: ResponseInstruction,
446 },
447}
448
449pub trait MessageRouter {
451 fn find_path(
453 &self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination
454 ) -> Result<OnionMessagePath, ()>;
455
456 fn create_blinded_paths<
459 T: secp256k1::Signing + secp256k1::Verification
460 >(
461 &self, recipient: PublicKey, context: MessageContext, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
462 ) -> Result<Vec<BlindedMessagePath>, ()>;
463
464 fn create_compact_blinded_paths<
478 T: secp256k1::Signing + secp256k1::Verification
479 >(
480 &self, recipient: PublicKey, context: MessageContext,
481 peers: Vec<MessageForwardNode>, secp_ctx: &Secp256k1<T>,
482 ) -> Result<Vec<BlindedMessagePath>, ()> {
483 let peers = peers
484 .into_iter()
485 .map(|MessageForwardNode { node_id, short_channel_id: _ }| node_id)
486 .collect();
487 self.create_blinded_paths(recipient, context, peers, secp_ctx)
488 }
489}
490
491pub struct DefaultMessageRouter<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref>
500where
501 L::Target: Logger,
502 ES::Target: EntropySource,
503{
504 network_graph: G,
505 entropy_source: ES,
506}
507
508impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> DefaultMessageRouter<G, L, ES>
509where
510 L::Target: Logger,
511 ES::Target: EntropySource,
512{
513 pub fn new(network_graph: G, entropy_source: ES) -> Self {
515 Self { network_graph, entropy_source }
516 }
517
518 fn create_blinded_paths_from_iter<
519 I: ExactSizeIterator<Item = MessageForwardNode>,
520 T: secp256k1::Signing + secp256k1::Verification
521 >(
522 network_graph: &G, recipient: PublicKey, context: MessageContext, peers: I,
523 entropy_source: &ES, secp_ctx: &Secp256k1<T>, compact_paths: bool,
524 ) -> Result<Vec<BlindedMessagePath>, ()> {
525 const MAX_PATHS: usize = 3;
527
528 const MIN_PEER_CHANNELS: usize = 3;
531
532 let network_graph = network_graph.deref().read_only();
533 let is_recipient_announced =
534 network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
535
536 let has_one_peer = peers.len() == 1;
537 let mut peer_info = peers
538 .filter_map(|peer|
540 network_graph
541 .node(&NodeId::from_pubkey(&peer.node_id))
542 .filter(|info|
543 !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS
544 )
545 .map(|info| (peer, info.is_tor_only(), info.channels.len()))
546 .or_else(|| (!is_recipient_announced && has_one_peer)
548 .then(|| (peer, false, 0))
549 )
550 )
551 .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
553 .collect::<Vec<_>>();
554
555 peer_info.sort_unstable_by(|(_, a_tor_only, a_channels), (_, b_tor_only, b_channels)| {
557 a_tor_only.cmp(b_tor_only).then(a_channels.cmp(b_channels).reverse())
558 });
559
560 let paths = peer_info.into_iter()
561 .map(|(peer, _, _)| {
562 BlindedMessagePath::new(&[peer], recipient, context.clone(), &**entropy_source, secp_ctx)
563 })
564 .take(MAX_PATHS)
565 .collect::<Result<Vec<_>, _>>();
566
567 let mut paths = match paths {
568 Ok(paths) if !paths.is_empty() => Ok(paths),
569 _ => {
570 if is_recipient_announced {
571 BlindedMessagePath::new(&[], recipient, context, &**entropy_source, secp_ctx)
572 .map(|path| vec![path])
573 } else {
574 Err(())
575 }
576 },
577 }?;
578
579 if compact_paths {
580 for path in &mut paths {
581 path.use_compact_introduction_node(&network_graph);
582 }
583 }
584
585 Ok(paths)
586 }
587
588 pub(crate) fn find_path(
589 network_graph: &G, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination
590 ) -> Result<OnionMessagePath, ()> {
591 let network_graph = network_graph.deref().read_only();
592 destination.resolve(&network_graph);
593
594 let first_node = match destination.first_node() {
595 Some(first_node) => first_node,
596 None => return Err(()),
597 };
598
599 if peers.contains(&first_node) || sender == first_node {
600 Ok(OnionMessagePath {
601 intermediate_nodes: vec![], destination, first_node_addresses: None
602 })
603 } else {
604 let node_details = network_graph
605 .node(&NodeId::from_pubkey(&first_node))
606 .and_then(|node_info| node_info.announcement_info.as_ref())
607 .map(|announcement_info| (announcement_info.features(), announcement_info.addresses()));
608
609 match node_details {
610 Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => {
611 let first_node_addresses = Some(addresses.to_vec());
612 Ok(OnionMessagePath {
613 intermediate_nodes: vec![], destination, first_node_addresses
614 })
615 },
616 _ => Err(()),
617 }
618 }
619 }
620
621 pub(crate) fn create_blinded_paths<
622 T: secp256k1::Signing + secp256k1::Verification
623 >(
624 network_graph: &G, recipient: PublicKey, context: MessageContext,
625 peers: Vec<PublicKey>, entropy_source: &ES, secp_ctx: &Secp256k1<T>,
626 ) -> Result<Vec<BlindedMessagePath>, ()> {
627 let peers = peers
628 .into_iter()
629 .map(|node_id| MessageForwardNode { node_id, short_channel_id: None });
630 Self::create_blinded_paths_from_iter(network_graph, recipient, context, peers.into_iter(), entropy_source, secp_ctx, false)
631 }
632
633 pub(crate) fn create_compact_blinded_paths<
634 T: secp256k1::Signing + secp256k1::Verification
635 >(
636 network_graph: &G, recipient: PublicKey, context: MessageContext,
637 peers: Vec<MessageForwardNode>, entropy_source: &ES, secp_ctx: &Secp256k1<T>,
638 ) -> Result<Vec<BlindedMessagePath>, ()> {
639 Self::create_blinded_paths_from_iter(network_graph, recipient, context, peers.into_iter(), entropy_source, secp_ctx, true)
640 }
641}
642
643impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
644where
645 L::Target: Logger,
646 ES::Target: EntropySource,
647{
648 fn find_path(
649 &self, sender: PublicKey, peers: Vec<PublicKey>, destination: Destination
650 ) -> Result<OnionMessagePath, ()> {
651 Self::find_path(&self.network_graph, sender, peers, destination)
652 }
653
654 fn create_blinded_paths<
655 T: secp256k1::Signing + secp256k1::Verification
656 >(
657 &self, recipient: PublicKey, context: MessageContext, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
658 ) -> Result<Vec<BlindedMessagePath>, ()> {
659 Self::create_blinded_paths(&self.network_graph, recipient, context, peers, &self.entropy_source, secp_ctx)
660 }
661
662 fn create_compact_blinded_paths<
663 T: secp256k1::Signing + secp256k1::Verification
664 >(
665 &self, recipient: PublicKey, context: MessageContext, peers: Vec<MessageForwardNode>, secp_ctx: &Secp256k1<T>,
666 ) -> Result<Vec<BlindedMessagePath>, ()> {
667 Self::create_compact_blinded_paths(&self.network_graph, recipient, context, peers, &self.entropy_source, secp_ctx)
668 }
669
670}
671
672#[derive(Clone)]
674pub struct OnionMessagePath {
675 pub intermediate_nodes: Vec<PublicKey>,
677
678 pub destination: Destination,
680
681 pub first_node_addresses: Option<Vec<SocketAddress>>,
686}
687
688impl OnionMessagePath {
689 pub fn first_node(&self) -> Option<PublicKey> {
691 self.intermediate_nodes
692 .first()
693 .copied()
694 .or_else(|| self.destination.first_node())
695 }
696}
697
698#[derive(Clone, Hash, Debug, PartialEq, Eq)]
700pub enum Destination {
701 Node(PublicKey),
703 BlindedPath(BlindedMessagePath),
705}
706
707impl Destination {
708 pub fn resolve(&mut self, network_graph: &ReadOnlyNetworkGraph) {
712 if let Destination::BlindedPath(path) = self {
713 if let IntroductionNode::DirectedShortChannelId(..) = path.introduction_node() {
714 if let Some(pubkey) = path
715 .public_introduction_node_id(network_graph)
716 .and_then(|node_id| node_id.as_pubkey().ok())
717 {
718 *path.introduction_node_mut() = IntroductionNode::NodeId(pubkey);
719 }
720 }
721 }
722 }
723
724 pub(super) fn num_hops(&self) -> usize {
725 match self {
726 Destination::Node(_) => 1,
727 Destination::BlindedPath(path) => path.blinded_hops().len(),
728 }
729 }
730
731 fn first_node(&self) -> Option<PublicKey> {
732 match self {
733 Destination::Node(node_id) => Some(*node_id),
734 Destination::BlindedPath(path) => {
735 match path.introduction_node() {
736 IntroductionNode::NodeId(pubkey) => Some(*pubkey),
737 IntroductionNode::DirectedShortChannelId(..) => None,
738 }
739 },
740 }
741 }
742}
743
744#[derive(Clone, Hash, Debug, PartialEq, Eq)]
748pub enum SendSuccess {
749 Buffered,
752 BufferedAwaitingConnection(PublicKey),
755}
756
757#[derive(Clone, Hash, Debug, PartialEq, Eq)]
761pub enum SendError {
762 Secp256k1(secp256k1::Error),
764 TooBigPacket,
767 TooFewBlindedHops,
770 InvalidFirstHop(PublicKey),
772 PathNotFound,
778 InvalidMessage,
780 BufferFull,
782 GetNodeIdFailed,
786 UnresolvedIntroductionNode,
790 BlindedPathAdvanceFailed,
795}
796
797pub trait CustomOnionMessageHandler {
808 type CustomMessage: OnionMessageContents;
811
812 fn handle_custom_message(
816 &self, message: Self::CustomMessage, context: Option<Vec<u8>>, responder: Option<Responder>
817 ) -> Option<(Self::CustomMessage, ResponseInstruction)>;
818
819 fn read_custom_message<R: io::Read>(&self, message_type: u64, buffer: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError>;
822
823 fn release_pending_custom_messages(&self) -> Vec<(Self::CustomMessage, MessageSendInstructions)>;
828}
829
830#[derive(Clone, Debug)]
833pub enum PeeledOnion<T: OnionMessageContents> {
834 Forward(NextMessageHop, OnionMessage),
836 Receive(ParsedOnionMessageContents<T>, Option<MessageContext>, Option<BlindedMessagePath>)
838}
839
840
841pub fn create_onion_message_resolving_destination<
848 ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents
849>(
850 entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
851 network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1<secp256k1::All>,
852 mut path: OnionMessagePath, contents: T, reply_path: Option<BlindedMessagePath>,
853) -> Result<(PublicKey, OnionMessage, Option<Vec<SocketAddress>>), SendError>
854where
855 ES::Target: EntropySource,
856 NS::Target: NodeSigner,
857 NL::Target: NodeIdLookUp,
858{
859 path.destination.resolve(network_graph);
860 create_onion_message(
861 entropy_source, node_signer, node_id_lookup, secp_ctx, path, contents, reply_path,
862 )
863}
864
865pub fn create_onion_message<ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents>(
877 entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
878 secp_ctx: &Secp256k1<secp256k1::All>, path: OnionMessagePath, contents: T,
879 reply_path: Option<BlindedMessagePath>,
880) -> Result<(PublicKey, OnionMessage, Option<Vec<SocketAddress>>), SendError>
881where
882 ES::Target: EntropySource,
883 NS::Target: NodeSigner,
884 NL::Target: NodeIdLookUp,
885{
886 let OnionMessagePath { intermediate_nodes, mut destination, first_node_addresses } = path;
887 if let Destination::BlindedPath(ref path) = destination {
888 if path.blinded_hops().is_empty() {
889 return Err(SendError::TooFewBlindedHops);
890 }
891 }
892
893 if contents.tlv_type() < 64 { return Err(SendError::InvalidMessage) }
894
895 if intermediate_nodes.len() == 0 {
898 if let Destination::BlindedPath(ref mut blinded_path) = destination {
899 let our_node_id = node_signer.get_node_id(Recipient::Node)
900 .map_err(|()| SendError::GetNodeIdFailed)?;
901 let introduction_node_id = match blinded_path.introduction_node() {
902 IntroductionNode::NodeId(pubkey) => *pubkey,
903 IntroductionNode::DirectedShortChannelId(direction, scid) => {
904 match node_id_lookup.next_node_id(*scid) {
905 Some(next_node_id) => *direction.select_pubkey(&our_node_id, &next_node_id),
906 None => return Err(SendError::UnresolvedIntroductionNode),
907 }
908 },
909 };
910 if introduction_node_id == our_node_id {
911 blinded_path.advance_path_by_one(node_signer, node_id_lookup, &secp_ctx)
912 .map_err(|()| SendError::BlindedPathAdvanceFailed)?;
913 }
914 }
915 }
916
917 let blinding_secret_bytes = entropy_source.get_secure_random_bytes();
918 let blinding_secret = SecretKey::from_slice(&blinding_secret_bytes[..]).expect("RNG is busted");
919 let (first_node_id, blinding_point) = if let Some(first_node_id) = intermediate_nodes.first() {
920 (*first_node_id, PublicKey::from_secret_key(&secp_ctx, &blinding_secret))
921 } else {
922 match &destination {
923 Destination::Node(pk) => (*pk, PublicKey::from_secret_key(&secp_ctx, &blinding_secret)),
924 Destination::BlindedPath(path) => {
925 match path.introduction_node() {
926 IntroductionNode::NodeId(pubkey) => (*pubkey, path.blinding_point()),
927 IntroductionNode::DirectedShortChannelId(..) => {
928 return Err(SendError::UnresolvedIntroductionNode);
929 },
930 }
931 }
932 }
933 };
934 let (packet_payloads, packet_keys) = packet_payloads_and_keys(
935 &secp_ctx, intermediate_nodes, destination, contents, reply_path, &blinding_secret
936 )?;
937
938 let prng_seed = entropy_source.get_secure_random_bytes();
939 let onion_routing_packet = construct_onion_message_packet(
940 packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
941
942 let message = OnionMessage { blinding_point, onion_routing_packet };
943 Ok((first_node_id, message, first_node_addresses))
944}
945
946pub fn peel_onion_message<NS: Deref, L: Deref, CMH: Deref>(
951 msg: &OnionMessage, secp_ctx: &Secp256k1<secp256k1::All>, node_signer: NS, logger: L,
952 custom_handler: CMH,
953) -> Result<PeeledOnion<<<CMH>::Target as CustomOnionMessageHandler>::CustomMessage>, ()>
954where
955 NS::Target: NodeSigner,
956 L::Target: Logger,
957 CMH::Target: CustomOnionMessageHandler,
958{
959 let control_tlvs_ss = match node_signer.ecdh(Recipient::Node, &msg.blinding_point, None) {
960 Ok(ss) => ss,
961 Err(e) => {
962 log_error!(logger, "Failed to retrieve node secret: {:?}", e);
963 return Err(());
964 }
965 };
966 let onion_decode_ss = {
967 let blinding_factor = {
968 let mut hmac = HmacEngine::<Sha256>::new(b"blinded_node_id");
969 hmac.input(control_tlvs_ss.as_ref());
970 Hmac::from_engine(hmac).to_byte_array()
971 };
972 match node_signer.ecdh(Recipient::Node, &msg.onion_routing_packet.public_key,
973 Some(&Scalar::from_be_bytes(blinding_factor).unwrap()))
974 {
975 Ok(ss) => ss.secret_bytes(),
976 Err(()) => {
977 log_trace!(logger, "Failed to compute onion packet shared secret");
978 return Err(());
979 }
980 }
981 };
982 match onion_utils::decode_next_untagged_hop(
983 onion_decode_ss, &msg.onion_routing_packet.hop_data[..], msg.onion_routing_packet.hmac,
984 (control_tlvs_ss, custom_handler.deref(), logger.deref())
985 ) {
986 Ok((Payload::Receive::<ParsedOnionMessageContents<<<CMH as Deref>::Target as CustomOnionMessageHandler>::CustomMessage>> {
987 message, control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context }), reply_path,
988 }, None)) => {
989 match (&message, &context) {
990 (_, None) => {
991 Ok(PeeledOnion::Receive(message, None, reply_path))
992 }
993 (ParsedOnionMessageContents::Offers(_), Some(MessageContext::Offers(_))) => {
994 Ok(PeeledOnion::Receive(message, context, reply_path))
995 }
996 #[cfg(async_payments)]
997 (ParsedOnionMessageContents::AsyncPayments(_), Some(MessageContext::AsyncPayments(_))) => {
998 Ok(PeeledOnion::Receive(message, context, reply_path))
999 }
1000 (ParsedOnionMessageContents::Custom(_), Some(MessageContext::Custom(_))) => {
1001 Ok(PeeledOnion::Receive(message, context, reply_path))
1002 }
1003 (ParsedOnionMessageContents::DNSResolver(_), Some(MessageContext::DNSResolver(_))) => {
1004 Ok(PeeledOnion::Receive(message, context, reply_path))
1005 }
1006 _ => {
1007 log_trace!(logger, "Received message was sent on a blinded path with the wrong context.");
1008 Err(())
1009 }
1010 }
1011 },
1012 Ok((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
1013 next_hop, next_blinding_override
1014 })), Some((next_hop_hmac, new_packet_bytes)))) => {
1015 let new_pubkey = match onion_utils::next_hop_pubkey(&secp_ctx, msg.onion_routing_packet.public_key, &onion_decode_ss) {
1021 Ok(pk) => pk,
1022 Err(e) => {
1023 log_trace!(logger, "Failed to compute next hop packet pubkey: {}", e);
1024 return Err(())
1025 }
1026 };
1027 let outgoing_packet = Packet {
1028 version: 0,
1029 public_key: new_pubkey,
1030 hop_data: new_packet_bytes,
1031 hmac: next_hop_hmac,
1032 };
1033 let onion_message = OnionMessage {
1034 blinding_point: match next_blinding_override {
1035 Some(blinding_point) => blinding_point,
1036 None => {
1037 match onion_utils::next_hop_pubkey(
1038 &secp_ctx, msg.blinding_point, control_tlvs_ss.as_ref()
1039 ) {
1040 Ok(bp) => bp,
1041 Err(e) => {
1042 log_trace!(logger, "Failed to compute next blinding point: {}", e);
1043 return Err(())
1044 }
1045 }
1046 }
1047 },
1048 onion_routing_packet: outgoing_packet,
1049 };
1050
1051 Ok(PeeledOnion::Forward(next_hop, onion_message))
1052 },
1053 Err(e) => {
1054 log_trace!(logger, "Errored decoding onion message packet: {:?}", e);
1055 Err(())
1056 },
1057 _ => {
1058 log_trace!(logger, "Received bogus onion message packet, either the sender encoded a final hop as a forwarding hop or vice versa");
1059 Err(())
1060 },
1061 }
1062}
1063
1064macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
1065 {
1069 let mut queue_lock = $event_queue.lock().unwrap();
1070
1071 let mut any_error = false;
1074 queue_lock.retain(|_| {
1075 $res_iter.next().map_or(true, |r| {
1076 let is_err = r.is_err();
1077 any_error |= is_err;
1078 is_err
1079 })
1080 });
1081
1082 if any_error {
1083 $self.pending_events_processor.store(false, Ordering::Release);
1085 $self.event_notifier.notify();
1086 return;
1087 }
1088 }
1089}}
1090
1091impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, DRH: Deref, CMH: Deref>
1092OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
1093where
1094 ES::Target: EntropySource,
1095 NS::Target: NodeSigner,
1096 L::Target: Logger,
1097 NL::Target: NodeIdLookUp,
1098 MR::Target: MessageRouter,
1099 OMH::Target: OffersMessageHandler,
1100 APH::Target: AsyncPaymentsMessageHandler,
1101 DRH::Target: DNSResolverMessageHandler,
1102 CMH::Target: CustomOnionMessageHandler,
1103{
1104 pub fn new(
1107 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
1108 offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH, custom_handler: CMH,
1109 ) -> Self {
1110 Self::new_inner(
1111 entropy_source, node_signer, logger, node_id_lookup, message_router,
1112 offers_handler, async_payments_handler, dns_resolver, custom_handler, false,
1113 )
1114 }
1115
1116 pub fn new_with_offline_peer_interception(
1138 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
1139 message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH,
1140 custom_handler: CMH,
1141 ) -> Self {
1142 Self::new_inner(
1143 entropy_source, node_signer, logger, node_id_lookup, message_router,
1144 offers_handler, async_payments_handler, dns_resolver, custom_handler, true,
1145 )
1146 }
1147
1148 fn new_inner(
1149 entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
1150 message_router: MR, offers_handler: OMH, async_payments_handler: APH, dns_resolver: DRH,
1151 custom_handler: CMH, intercept_messages_for_offline_peers: bool,
1152 ) -> Self {
1153 let mut secp_ctx = Secp256k1::new();
1154 secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
1155 OnionMessenger {
1156 entropy_source,
1157 node_signer,
1158 message_recipients: Mutex::new(new_hash_map()),
1159 secp_ctx,
1160 logger,
1161 node_id_lookup,
1162 message_router,
1163 offers_handler,
1164 async_payments_handler,
1165 dns_resolver_handler: dns_resolver,
1166 custom_handler,
1167 intercept_messages_for_offline_peers,
1168 pending_intercepted_msgs_events: Mutex::new(Vec::new()),
1169 pending_peer_connected_events: Mutex::new(Vec::new()),
1170 pending_events_processor: AtomicBool::new(false),
1171 event_notifier: Notifier::new(),
1172 }
1173 }
1174
1175 #[cfg(test)]
1176 pub(crate) fn set_offers_handler(&mut self, offers_handler: OMH) {
1177 self.offers_handler = offers_handler;
1178 }
1179
1180 pub fn send_onion_message<T: OnionMessageContents>(
1182 &self, contents: T, instructions: MessageSendInstructions,
1183 ) -> Result<SendSuccess, SendError> {
1184 self.send_onion_message_internal(contents, instructions, format_args!(""))
1185 }
1186
1187 fn send_onion_message_internal<T: OnionMessageContents>(
1188 &self, contents: T, instructions: MessageSendInstructions, log_suffix: fmt::Arguments,
1189 ) -> Result<SendSuccess, SendError> {
1190 let (destination, reply_path) = match instructions {
1191 MessageSendInstructions::WithSpecifiedReplyPath { destination, reply_path } =>
1192 (destination, Some(reply_path)),
1193 MessageSendInstructions::WithReplyPath { destination, context }
1194 |MessageSendInstructions::ForReply { instructions: ResponseInstruction { destination, context: Some(context) } } =>
1195 {
1196 match self.create_blinded_path(context) {
1197 Ok(reply_path) => (destination, Some(reply_path)),
1198 Err(err) => {
1199 log_trace!(
1200 self.logger,
1201 "Failed to create reply path {}: {:?}",
1202 log_suffix, err
1203 );
1204 return Err(err);
1205 }
1206 }
1207 },
1208 MessageSendInstructions::WithoutReplyPath { destination }
1209 |MessageSendInstructions::ForReply { instructions: ResponseInstruction { destination, context: None } } =>
1210 (destination, None),
1211 };
1212
1213 let mut logger = WithContext::from(&self.logger, None, None, None);
1214 let result = self.find_path(destination).and_then(|path| {
1215 let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
1216 logger = WithContext::from(&self.logger, first_hop, None, None);
1217 self.enqueue_onion_message(path, contents, reply_path, log_suffix)
1218 });
1219
1220 match result.as_ref() {
1221 Err(SendError::GetNodeIdFailed) => {
1222 log_warn!(logger, "Unable to retrieve node id {}", log_suffix);
1223 },
1224 Err(SendError::PathNotFound) => {
1225 log_trace!(logger, "Failed to find path {}", log_suffix);
1226 },
1227 Err(e) => {
1228 log_trace!(logger, "Failed sending onion message {}: {:?}", log_suffix, e);
1229 },
1230 Ok(SendSuccess::Buffered) => {
1231 log_trace!(logger, "Buffered onion message {}", log_suffix);
1232 },
1233 Ok(SendSuccess::BufferedAwaitingConnection(node_id)) => {
1234 log_trace!(
1235 logger,
1236 "Buffered onion message waiting on peer connection {}: {}",
1237 log_suffix, node_id
1238 );
1239 },
1240 }
1241
1242 result
1243 }
1244
1245 fn find_path(&self, destination: Destination) -> Result<OnionMessagePath, SendError> {
1246 let sender = self.node_signer
1247 .get_node_id(Recipient::Node)
1248 .map_err(|_| SendError::GetNodeIdFailed)?;
1249
1250 let peers = self.message_recipients.lock().unwrap()
1251 .iter()
1252 .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_)))
1253 .map(|(node_id, _)| *node_id)
1254 .collect();
1255
1256 self.message_router
1257 .find_path(sender, peers, destination)
1258 .map_err(|_| SendError::PathNotFound)
1259 }
1260
1261 fn create_blinded_path(&self, context: MessageContext) -> Result<BlindedMessagePath, SendError> {
1262 let recipient = self.node_signer
1263 .get_node_id(Recipient::Node)
1264 .map_err(|_| SendError::GetNodeIdFailed)?;
1265 let secp_ctx = &self.secp_ctx;
1266
1267 let peers = self.message_recipients.lock().unwrap()
1268 .iter()
1269 .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
1270 .map(|(node_id, _ )| *node_id)
1271 .collect::<Vec<_>>();
1272
1273 self.message_router
1274 .create_blinded_paths(recipient, context, peers, secp_ctx)
1275 .and_then(|paths| paths.into_iter().next().ok_or(()))
1276 .map_err(|_| SendError::PathNotFound)
1277 }
1278
1279 fn enqueue_onion_message<T: OnionMessageContents>(
1280 &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedMessagePath>,
1281 log_suffix: fmt::Arguments
1282 ) -> Result<SendSuccess, SendError> {
1283 log_trace!(self.logger, "Constructing onion message {}: {:?}", log_suffix, contents);
1284
1285 let (first_node_id, onion_message, addresses) = create_onion_message(
1286 &self.entropy_source, &self.node_signer, &self.node_id_lookup, &self.secp_ctx, path,
1287 contents, reply_path,
1288 )?;
1289
1290 let mut message_recipients = self.message_recipients.lock().unwrap();
1291 if outbound_buffer_full(&first_node_id, &message_recipients) {
1292 return Err(SendError::BufferFull);
1293 }
1294
1295 match message_recipients.entry(first_node_id) {
1296 hash_map::Entry::Vacant(e) => match addresses {
1297 None => Err(SendError::InvalidFirstHop(first_node_id)),
1298 Some(addresses) => {
1299 e.insert(OnionMessageRecipient::pending_connection(addresses))
1300 .enqueue_message(onion_message);
1301 self.event_notifier.notify();
1302 Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
1303 },
1304 },
1305 hash_map::Entry::Occupied(mut e) => {
1306 e.get_mut().enqueue_message(onion_message);
1307 if e.get().is_connected() {
1308 Ok(SendSuccess::Buffered)
1309 } else {
1310 Ok(SendSuccess::BufferedAwaitingConnection(first_node_id))
1311 }
1312 },
1313 }
1314 }
1315
1316 pub fn forward_onion_message(
1321 &self, message: OnionMessage, peer_node_id: &PublicKey
1322 ) -> Result<(), SendError> {
1323 let mut message_recipients = self.message_recipients.lock().unwrap();
1324 if outbound_buffer_full(&peer_node_id, &message_recipients) {
1325 return Err(SendError::BufferFull);
1326 }
1327
1328 match message_recipients.entry(*peer_node_id) {
1329 hash_map::Entry::Occupied(mut e) if e.get().is_connected() => {
1330 e.get_mut().enqueue_message(message);
1331 Ok(())
1332 },
1333 _ => Err(SendError::InvalidFirstHop(*peer_node_id))
1334 }
1335 }
1336
1337 #[cfg(any(test, feature = "_test_utils"))]
1338 pub fn send_onion_message_using_path<T: OnionMessageContents>(
1339 &self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedMessagePath>
1340 ) -> Result<SendSuccess, SendError> {
1341 self.enqueue_onion_message(path, contents, reply_path, format_args!(""))
1342 }
1343
1344 pub(crate) fn peel_onion_message(
1345 &self, msg: &OnionMessage
1346 ) -> Result<PeeledOnion<<<CMH>::Target as CustomOnionMessageHandler>::CustomMessage>, ()> {
1347 peel_onion_message(
1348 msg, &self.secp_ctx, &*self.node_signer, &*self.logger, &*self.custom_handler
1349 )
1350 }
1351
1352 pub fn handle_onion_message_response<T: OnionMessageContents>(
1361 &self, response: T, instructions: ResponseInstruction,
1362 ) -> Result<SendSuccess, SendError> {
1363 let message_type = response.msg_type();
1364 self.send_onion_message_internal(
1365 response, instructions.into_instructions(),
1366 format_args!(
1367 "when responding with {} to an onion message",
1368 message_type,
1369 )
1370 )
1371 }
1372
1373 #[cfg(test)]
1374 pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<OnionMessage>> {
1375 let mut message_recipients = self.message_recipients.lock().unwrap();
1376 let mut msgs = new_hash_map();
1377 for (node_id, recipient) in &mut *message_recipients {
1380 msgs.insert(*node_id, recipient.release_pending_messages());
1381 }
1382 msgs
1383 }
1384
1385 fn enqueue_intercepted_event(&self, event: Event) {
1386 const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
1387 let mut pending_intercepted_msgs_events =
1388 self.pending_intercepted_msgs_events.lock().unwrap();
1389 let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
1390 .map(|ev| ev.serialized_length()).sum();
1391 if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
1392 log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
1393 return
1394 }
1395 pending_intercepted_msgs_events.push(event);
1396 self.event_notifier.notify();
1397 }
1398
1399 pub fn get_update_future(&self) -> Future {
1407 self.event_notifier.get_future()
1408 }
1409
1410 pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
1418 &self, handler: H
1419 ) {
1420 if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1421 return;
1422 }
1423
1424 {
1425 let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
1426 let mut futures = Vec::with_capacity(intercepted_msgs.len());
1427 for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1428 if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1429 if let Some(addresses) = addresses.take() {
1430 let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
1431 log_trace!(self.logger, "Handling event {:?} async...", event);
1432 let future = ResultFuture::Pending(handler(event));
1433 futures.push(future);
1434 }
1435 }
1436 }
1437
1438 let intercepted_msgs_offset = futures.len();
1441
1442 for ev in intercepted_msgs {
1443 if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1444 log_trace!(self.logger, "Handling event {:?} async...", ev);
1445 let future = ResultFuture::Pending(handler(ev));
1446 futures.push(future);
1447 }
1448
1449 if !futures.is_empty() {
1450 let res = MultiResultFuturePoller::new(futures).await;
1452 log_trace!(self.logger, "Done handling events async, results: {:?}", res);
1453 let mut res_iter = res.iter().skip(intercepted_msgs_offset);
1454 drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
1455 }
1456 }
1457
1458 {
1459 let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
1460 let num_peer_connecteds = peer_connecteds.len();
1461 if num_peer_connecteds <= 1 {
1462 for event in peer_connecteds {
1463 if handler(event).await.is_ok() {
1464 self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
1465 } else {
1466 self.pending_events_processor.store(false, Ordering::Release);
1468 return;
1469 }
1470 }
1471 } else {
1472 let mut futures = Vec::new();
1473 for event in peer_connecteds {
1474 log_trace!(self.logger, "Handling event {:?} async...", event);
1475 let future = ResultFuture::Pending(handler(event));
1476 futures.push(future);
1477 }
1478
1479 if !futures.is_empty() {
1480 let res = MultiResultFuturePoller::new(futures).await;
1481 log_trace!(self.logger, "Done handling events async, results: {:?}", res);
1482 let mut res_iter = res.iter();
1483 drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
1484 }
1485 }
1486 }
1487 self.pending_events_processor.store(false, Ordering::Release);
1488 }
1489}
1490
1491fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>) -> bool {
1492 const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
1493 const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
1494 let mut total_buffered_bytes = 0;
1495 let mut peer_buffered_bytes = 0;
1496 for (pk, peer_buf) in buffer {
1497 for om in peer_buf.pending_messages() {
1498 let om_len = om.serialized_length();
1499 if pk == peer_node_id {
1500 peer_buffered_bytes += om_len;
1501 }
1502 total_buffered_bytes += om_len;
1503
1504 if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
1505 peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
1506 {
1507 return true
1508 }
1509 }
1510 }
1511 false
1512}
1513
1514impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, DRH: Deref, CMH: Deref> EventsProvider
1515for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
1516where
1517 ES::Target: EntropySource,
1518 NS::Target: NodeSigner,
1519 L::Target: Logger,
1520 NL::Target: NodeIdLookUp,
1521 MR::Target: MessageRouter,
1522 OMH::Target: OffersMessageHandler,
1523 APH::Target: AsyncPaymentsMessageHandler,
1524 DRH::Target: DNSResolverMessageHandler,
1525 CMH::Target: CustomOnionMessageHandler,
1526{
1527 fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
1528 if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1529 return;
1530 }
1531
1532 for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1533 if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1534 if let Some(addresses) = addresses.take() {
1535 let event = Event::ConnectionNeeded { node_id: *node_id, addresses };
1536 log_trace!(self.logger, "Handling event {:?}...", event);
1537 let res = handler.handle_event(event);
1538 log_trace!(self.logger, "Done handling event, ignoring result: {:?}", res);
1539 }
1540 }
1541 }
1542 let intercepted_msgs;
1543 let peer_connecteds;
1544 {
1545 let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1546 intercepted_msgs = pending_intercepted_msgs_events.clone();
1547 let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1548 peer_connecteds = pending_peer_connected_events.clone();
1549 #[cfg(debug_assertions)] {
1550 for ev in pending_intercepted_msgs_events.iter() {
1551 if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
1552 }
1553 for ev in pending_peer_connected_events.iter() {
1554 if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
1555 }
1556 }
1557 }
1558
1559 let mut handling_intercepted_msgs_failed = false;
1560 let mut num_handled_intercepted_events = 0;
1561 for ev in intercepted_msgs {
1562 log_trace!(self.logger, "Handling event {:?}...", ev);
1563 let res = handler.handle_event(ev);
1564 log_trace!(self.logger, "Done handling event, result: {:?}", res);
1565 match res {
1566 Ok(()) => num_handled_intercepted_events += 1,
1567 Err(ReplayEvent ()) => {
1568 handling_intercepted_msgs_failed = true;
1569 break;
1570 }
1571 }
1572 }
1573
1574 {
1575 let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1576 pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
1577 }
1578
1579 if handling_intercepted_msgs_failed {
1580 self.pending_events_processor.store(false, Ordering::Release);
1581 self.event_notifier.notify();
1582 return;
1583 }
1584
1585 let mut num_handled_peer_connecteds = 0;
1586 for ev in peer_connecteds {
1587 log_trace!(self.logger, "Handling event {:?}...", ev);
1588 let res = handler.handle_event(ev);
1589 log_trace!(self.logger, "Done handling event, result: {:?}", res);
1590 match res {
1591 Ok(()) => num_handled_peer_connecteds += 1,
1592 Err(ReplayEvent ()) => {
1593 self.event_notifier.notify();
1594 break;
1595 }
1596 }
1597 }
1598
1599 {
1600 let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1601 pending_peer_connected_events.drain(..num_handled_peer_connecteds);
1602 pending_peer_connected_events.shrink_to(10); }
1604
1605 self.pending_events_processor.store(false, Ordering::Release);
1606 }
1607}
1608
1609impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, DRH: Deref, CMH: Deref> OnionMessageHandler
1610for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, DRH, CMH>
1611where
1612 ES::Target: EntropySource,
1613 NS::Target: NodeSigner,
1614 L::Target: Logger,
1615 NL::Target: NodeIdLookUp,
1616 MR::Target: MessageRouter,
1617 OMH::Target: OffersMessageHandler,
1618 APH::Target: AsyncPaymentsMessageHandler,
1619 DRH::Target: DNSResolverMessageHandler,
1620 CMH::Target: CustomOnionMessageHandler,
1621{
1622 fn handle_onion_message(&self, peer_node_id: PublicKey, msg: &OnionMessage) {
1623 let logger = WithContext::from(&self.logger, Some(peer_node_id), None, None);
1624 match self.peel_onion_message(msg) {
1625 Ok(PeeledOnion::Receive(message, context, reply_path)) => {
1626 log_trace!(
1627 logger,
1628 "Received an onion message with {} reply_path: {:?}",
1629 if reply_path.is_some() { "a" } else { "no" }, message);
1630
1631 let responder = reply_path.map(Responder::new);
1632 match message {
1633 ParsedOnionMessageContents::Offers(msg) => {
1634 let context = match context {
1635 None => None,
1636 Some(MessageContext::Offers(context)) => Some(context),
1637 _ => {
1638 debug_assert!(false, "Checked in peel_onion_message");
1639 return
1640 }
1641 };
1642 let response_instructions = self.offers_handler.handle_message(msg, context, responder);
1643 if let Some((msg, instructions)) = response_instructions {
1644 let _ = self.handle_onion_message_response(msg, instructions);
1645 }
1646 },
1647 #[cfg(async_payments)]
1648 ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::HeldHtlcAvailable(msg)) => {
1649 let response_instructions = self.async_payments_handler.handle_held_htlc_available(
1650 msg, responder
1651 );
1652 if let Some((msg, instructions)) = response_instructions {
1653 let _ = self.handle_onion_message_response(msg, instructions);
1654 }
1655 },
1656 #[cfg(async_payments)]
1657 ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::ReleaseHeldHtlc(msg)) => {
1658 let context = match context {
1659 Some(MessageContext::AsyncPayments(context)) => context,
1660 Some(_) => {
1661 debug_assert!(false, "Checked in peel_onion_message");
1662 return
1663 },
1664 None => return,
1665 };
1666 self.async_payments_handler.handle_release_held_htlc(msg, context);
1667 },
1668 ParsedOnionMessageContents::DNSResolver(DNSResolverMessage::DNSSECQuery(msg)) => {
1669 let response_instructions = self.dns_resolver_handler.handle_dnssec_query(msg, responder);
1670 if let Some((msg, instructions)) = response_instructions {
1671 let _ = self.handle_onion_message_response(msg, instructions);
1672 }
1673 },
1674 ParsedOnionMessageContents::DNSResolver(DNSResolverMessage::DNSSECProof(msg)) => {
1675 let context = match context {
1676 Some(MessageContext::DNSResolver(context)) => context,
1677 _ => return,
1678 };
1679 self.dns_resolver_handler.handle_dnssec_proof(msg, context);
1680 },
1681 ParsedOnionMessageContents::Custom(msg) => {
1682 let context = match context {
1683 None => None,
1684 Some(MessageContext::Custom(data)) => Some(data),
1685 _ => {
1686 debug_assert!(false, "Checked in peel_onion_message");
1687 return
1688 }
1689 };
1690 let response_instructions = self.custom_handler.handle_custom_message(msg, context, responder);
1691 if let Some((msg, instructions)) = response_instructions {
1692 let _ = self.handle_onion_message_response(msg, instructions);
1693 }
1694 },
1695 }
1696 },
1697 Ok(PeeledOnion::Forward(next_hop, onion_message)) => {
1698 let next_node_id = match next_hop {
1699 NextMessageHop::NodeId(pubkey) => pubkey,
1700 NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) {
1701 Some(pubkey) => pubkey,
1702 None => {
1703 log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid);
1704 return
1705 },
1706 },
1707 };
1708
1709 let mut message_recipients = self.message_recipients.lock().unwrap();
1710 if outbound_buffer_full(&next_node_id, &message_recipients) {
1711 log_trace!(
1712 logger,
1713 "Dropping forwarded onion message to peer {}: outbound buffer full",
1714 next_node_id);
1715 return
1716 }
1717
1718 #[cfg(fuzzing)]
1719 message_recipients
1720 .entry(next_node_id)
1721 .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()));
1722
1723 match message_recipients.entry(next_node_id) {
1724 hash_map::Entry::Occupied(mut e) if matches!(
1725 e.get(), OnionMessageRecipient::ConnectedPeer(..)
1726 ) => {
1727 e.get_mut().enqueue_message(onion_message);
1728 log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
1729 },
1730 _ if self.intercept_messages_for_offline_peers => {
1731 self.enqueue_intercepted_event(
1732 Event::OnionMessageIntercepted {
1733 peer_node_id: next_node_id, message: onion_message
1734 }
1735 );
1736 },
1737 _ => {
1738 log_trace!(
1739 logger,
1740 "Dropping forwarded onion message to disconnected peer {}",
1741 next_node_id);
1742 return
1743 },
1744 }
1745 },
1746 Err(e) => {
1747 log_error!(logger, "Failed to process onion message {:?}", e);
1748 }
1749 }
1750 }
1751
1752 fn peer_connected(&self, their_node_id: PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> {
1753 if init.features.supports_onion_messages() {
1754 self.message_recipients.lock().unwrap()
1755 .entry(their_node_id)
1756 .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
1757 .mark_connected();
1758 if self.intercept_messages_for_offline_peers {
1759 let mut pending_peer_connected_events =
1760 self.pending_peer_connected_events.lock().unwrap();
1761 pending_peer_connected_events.push(
1762 Event::OnionMessagePeerConnected { peer_node_id: their_node_id }
1763 );
1764 self.event_notifier.notify();
1765 }
1766 } else {
1767 self.message_recipients.lock().unwrap().remove(&their_node_id);
1768 }
1769
1770 Ok(())
1771 }
1772
1773 fn peer_disconnected(&self, their_node_id: PublicKey) {
1774 match self.message_recipients.lock().unwrap().remove(&their_node_id) {
1775 Some(OnionMessageRecipient::ConnectedPeer(..)) => {},
1776 Some(_) => debug_assert!(false),
1777 None => {},
1778 }
1779 }
1780
1781 fn timer_tick_occurred(&self) {
1782 let mut message_recipients = self.message_recipients.lock().unwrap();
1783
1784 message_recipients.retain(|_, recipient| match recipient {
1787 OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS,
1788 OnionMessageRecipient::PendingConnection(_, Some(_), _) => true,
1789 _ => true,
1790 });
1791
1792 for recipient in message_recipients.values_mut() {
1795 if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient {
1796 *ticks += 1;
1797 }
1798 }
1799 }
1800
1801 fn provided_node_features(&self) -> NodeFeatures {
1802 let mut features = NodeFeatures::empty();
1803 features.set_onion_messages_optional();
1804 features | self.dns_resolver_handler.provided_node_features()
1805 }
1806
1807 fn provided_init_features(&self, _their_node_id: PublicKey) -> InitFeatures {
1808 let mut features = InitFeatures::empty();
1809 features.set_onion_messages_optional();
1810 features
1811 }
1812
1813 fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<OnionMessage> {
1817 for (message, instructions) in self.offers_handler.release_pending_messages() {
1819 let _ = self.send_onion_message_internal(
1820 message, instructions, format_args!("when sending OffersMessage")
1821 );
1822 }
1823
1824 #[cfg(async_payments)] {
1825 for (message, instructions) in self.async_payments_handler.release_pending_messages() {
1826 let _ = self.send_onion_message_internal(
1827 message, instructions, format_args!("when sending AsyncPaymentsMessage")
1828 );
1829 }
1830 }
1831
1832 for (message, instructions) in self.dns_resolver_handler.release_pending_messages() {
1834 let _ = self.send_onion_message_internal(
1835 message, instructions, format_args!("when sending DNSResolverMessage")
1836 );
1837 }
1838
1839 for (message, instructions) in self.custom_handler.release_pending_custom_messages() {
1841 let _ = self.send_onion_message_internal(
1842 message, instructions, format_args!("when sending CustomMessage")
1843 );
1844 }
1845
1846 self.message_recipients.lock().unwrap()
1847 .get_mut(&peer_node_id)
1848 .and_then(|buffer| buffer.dequeue_message())
1849 }
1850}
1851
1852#[cfg(not(c_bindings))]
1862#[cfg(feature = "dnssec")]
1863pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
1864 Arc<KeysManager>,
1865 Arc<KeysManager>,
1866 Arc<L>,
1867 Arc<SimpleArcChannelManager<M, T, F, L>>,
1868 Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
1869 Arc<SimpleArcChannelManager<M, T, F, L>>,
1870 Arc<SimpleArcChannelManager<M, T, F, L>>,
1871 Arc<SimpleArcChannelManager<M, T, F, L>>,
1872 IgnoringMessageHandler
1873>;
1874
1875#[cfg(not(c_bindings))]
1883#[cfg(not(feature = "dnssec"))]
1884pub type SimpleArcOnionMessenger<M, T, F, L> = OnionMessenger<
1885 Arc<KeysManager>,
1886 Arc<KeysManager>,
1887 Arc<L>,
1888 Arc<SimpleArcChannelManager<M, T, F, L>>,
1889 Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
1890 Arc<SimpleArcChannelManager<M, T, F, L>>,
1891 Arc<SimpleArcChannelManager<M, T, F, L>>,
1892 IgnoringMessageHandler,
1893 IgnoringMessageHandler
1894>;
1895
1896#[cfg(not(c_bindings))]
1904#[cfg(feature = "dnssec")]
1905pub type SimpleRefOnionMessenger<
1906 'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, M, T, F, L
1907> = OnionMessenger<
1908 &'a KeysManager,
1909 &'a KeysManager,
1910 &'b L,
1911 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1912 &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
1913 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1914 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1915 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1916 IgnoringMessageHandler
1917>;
1918
1919#[cfg(not(c_bindings))]
1927#[cfg(not(feature = "dnssec"))]
1928pub type SimpleRefOnionMessenger<
1929 'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j, M, T, F, L
1930> = OnionMessenger<
1931 &'a KeysManager,
1932 &'a KeysManager,
1933 &'b L,
1934 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1935 &'i DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
1936 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1937 &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, M, T, F, L>,
1938 IgnoringMessageHandler,
1939 IgnoringMessageHandler
1940>;
1941
1942fn packet_payloads_and_keys<T: OnionMessageContents, S: secp256k1::Signing + secp256k1::Verification>(
1945 secp_ctx: &Secp256k1<S>, unblinded_path: Vec<PublicKey>, destination: Destination, message: T,
1946 mut reply_path: Option<BlindedMessagePath>, session_priv: &SecretKey
1947) -> Result<(Vec<(Payload<T>, [u8; 32])>, Vec<onion_utils::OnionKeys>), SendError> {
1948 let num_hops = unblinded_path.len() + destination.num_hops();
1949 let mut payloads = Vec::with_capacity(num_hops);
1950 let mut onion_packet_keys = Vec::with_capacity(num_hops);
1951
1952 let (mut intro_node_id_blinding_pt, num_blinded_hops) = match &destination {
1953 Destination::Node(_) => (None, 0),
1954 Destination::BlindedPath(path) => {
1955 let introduction_node_id = match path.introduction_node() {
1956 IntroductionNode::NodeId(pubkey) => pubkey,
1957 IntroductionNode::DirectedShortChannelId(..) => {
1958 return Err(SendError::UnresolvedIntroductionNode);
1959 },
1960 };
1961 (Some((*introduction_node_id, path.blinding_point())), path.blinded_hops().len())
1962 },
1963 };
1964 let num_unblinded_hops = num_hops - num_blinded_hops;
1965
1966 let mut unblinded_path_idx = 0;
1967 let mut blinded_path_idx = 0;
1968 let mut prev_control_tlvs_ss = None;
1969 let mut final_control_tlvs = None;
1970 utils::construct_keys_for_onion_message(
1971 secp_ctx, unblinded_path.into_iter(), destination, session_priv,
1972 |_, onion_packet_ss, ephemeral_pubkey, control_tlvs_ss, unblinded_pk_opt, enc_payload_opt| {
1973 if num_unblinded_hops != 0 && unblinded_path_idx < num_unblinded_hops {
1974 if let Some(ss) = prev_control_tlvs_ss.take() {
1975 payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(
1976 ForwardTlvs {
1977 next_hop: NextMessageHop::NodeId(unblinded_pk_opt.unwrap()),
1978 next_blinding_override: None,
1979 }
1980 )), ss));
1981 }
1982 prev_control_tlvs_ss = Some(control_tlvs_ss);
1983 unblinded_path_idx += 1;
1984 } else if let Some((intro_node_id, blinding_pt)) = intro_node_id_blinding_pt.take() {
1985 if let Some(control_tlvs_ss) = prev_control_tlvs_ss.take() {
1986 payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
1987 next_hop: NextMessageHop::NodeId(intro_node_id),
1988 next_blinding_override: Some(blinding_pt),
1989 })), control_tlvs_ss));
1990 }
1991 }
1992 if blinded_path_idx < num_blinded_hops.saturating_sub(1) && enc_payload_opt.is_some() {
1993 payloads.push((Payload::Forward(ForwardControlTlvs::Blinded(enc_payload_opt.unwrap())),
1994 control_tlvs_ss));
1995 blinded_path_idx += 1;
1996 } else if let Some(encrypted_payload) = enc_payload_opt {
1997 final_control_tlvs = Some(ReceiveControlTlvs::Blinded(encrypted_payload));
1998 prev_control_tlvs_ss = Some(control_tlvs_ss);
1999 }
2000
2001 let (rho, mu) = onion_utils::gen_rho_mu_from_shared_secret(onion_packet_ss.as_ref());
2002 onion_packet_keys.push(onion_utils::OnionKeys {
2003 #[cfg(test)]
2004 shared_secret: onion_packet_ss,
2005 #[cfg(test)]
2006 blinding_factor: [0; 32],
2007 ephemeral_pubkey,
2008 rho,
2009 mu,
2010 });
2011 }
2012 ).map_err(|e| SendError::Secp256k1(e))?;
2013
2014 if let Some(control_tlvs) = final_control_tlvs {
2015 payloads.push((Payload::Receive {
2016 control_tlvs,
2017 reply_path: reply_path.take(),
2018 message,
2019 }, prev_control_tlvs_ss.unwrap()));
2020 } else {
2021 payloads.push((Payload::Receive {
2022 control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { context: None }),
2023 reply_path: reply_path.take(),
2024 message,
2025 }, prev_control_tlvs_ss.unwrap()));
2026 }
2027
2028 Ok((payloads, onion_packet_keys))
2029}
2030
2031fn construct_onion_message_packet<T: OnionMessageContents>(payloads: Vec<(Payload<T>, [u8; 32])>, onion_keys: Vec<onion_utils::OnionKeys>, prng_seed: [u8; 32]) -> Result<Packet, ()> {
2033 let payloads_ser_len = onion_utils::payloads_serialized_length(&payloads);
2038 let hop_data_len = if payloads_ser_len <= SMALL_PACKET_HOP_DATA_LEN {
2039 SMALL_PACKET_HOP_DATA_LEN
2040 } else if payloads_ser_len <= BIG_PACKET_HOP_DATA_LEN {
2041 BIG_PACKET_HOP_DATA_LEN
2042 } else { return Err(()) };
2043
2044 onion_utils::construct_onion_message_packet::<_, _>(
2045 payloads, onion_keys, prng_seed, hop_data_len)
2046}