lightning/routing/
utxo.rs

1// This file is Copyright its original authors, visible in version control
2// history.
3//
4// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7// You may not use this file except in accordance with one or both of these
8// licenses.
9
10//! This module contains traits for LDK to access UTXOs to check gossip data is correct.
11//!
12//! When lightning nodes gossip channel information, they resist DoS attacks by checking that each
13//! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in
14//! order to announce a channel. This module handles that checking.
15
16use bitcoin::amount::Amount;
17use bitcoin::constants::ChainHash;
18use bitcoin::TxOut;
19
20use bitcoin::hex::DisplayHex;
21
22use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
23use crate::ln::msgs::{self, ErrorAction, LightningError, MessageSendEvent};
24use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
25use crate::util::logger::{Level, Logger};
26
27use crate::prelude::*;
28
29use crate::sync::{LockTestExt, Mutex};
30use alloc::sync::{Arc, Weak};
31use core::ops::Deref;
32
33/// An error when accessing the chain via [`UtxoLookup`].
34#[derive(Clone, Debug)]
35pub enum UtxoLookupError {
36	/// The requested chain is unknown.
37	UnknownChain,
38
39	/// The requested transaction doesn't exist or hasn't confirmed.
40	UnknownTx,
41}
42
43/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
44/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
45/// variant.
46#[derive(Clone)]
47pub enum UtxoResult {
48	/// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
49	/// requested or a [`UtxoLookupError`].
50	Sync(Result<TxOut, UtxoLookupError>),
51	/// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of
52	/// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes.
53	///
54	/// Note that in order to avoid runaway memory usage, the number of parallel checks is limited,
55	/// but only fairly loosely. Because a pending checks block all message processing, leaving
56	/// checks pending for an extended time may cause DoS of other functions. It is recommended you
57	/// keep a tight timeout on lookups, on the order of a few seconds.
58	Async(UtxoFuture),
59}
60
61/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs.
62pub trait UtxoLookup {
63	/// Returns the transaction output of a funding transaction encoded by [`short_channel_id`].
64	/// Returns an error if `chain_hash` is for a different chain or if such a transaction output is
65	/// unknown.
66	///
67	/// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id
68	fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
69}
70
71enum ChannelAnnouncement {
72	Full(msgs::ChannelAnnouncement),
73	Unsigned(msgs::UnsignedChannelAnnouncement),
74}
75impl ChannelAnnouncement {
76	fn node_id_1(&self) -> &NodeId {
77		match self {
78			ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
79			ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
80		}
81	}
82}
83
84enum NodeAnnouncement {
85	Full(msgs::NodeAnnouncement),
86	Unsigned(msgs::UnsignedNodeAnnouncement),
87}
88impl NodeAnnouncement {
89	fn timestamp(&self) -> u32 {
90		match self {
91			NodeAnnouncement::Full(msg) => msg.contents.timestamp,
92			NodeAnnouncement::Unsigned(msg) => msg.timestamp,
93		}
94	}
95}
96
97enum ChannelUpdate {
98	Full(msgs::ChannelUpdate),
99	Unsigned(msgs::UnsignedChannelUpdate),
100}
101impl ChannelUpdate {
102	fn timestamp(&self) -> u32 {
103		match self {
104			ChannelUpdate::Full(msg) => msg.contents.timestamp,
105			ChannelUpdate::Unsigned(msg) => msg.timestamp,
106		}
107	}
108}
109
110struct UtxoMessages {
111	complete: Option<Result<TxOut, UtxoLookupError>>,
112	channel_announce: Option<ChannelAnnouncement>,
113	latest_node_announce_a: Option<NodeAnnouncement>,
114	latest_node_announce_b: Option<NodeAnnouncement>,
115	latest_channel_update_a: Option<ChannelUpdate>,
116	latest_channel_update_b: Option<ChannelUpdate>,
117}
118
119/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
120///
121/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info.
122#[derive(Clone)]
123pub struct UtxoFuture {
124	state: Arc<Mutex<UtxoMessages>>,
125}
126
127/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph
128/// once we have a concrete resolution of a request.
129pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
130impl UtxoLookup for UtxoResolver {
131	fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
132		UtxoResult::Sync(self.0.clone())
133	}
134}
135
136impl UtxoFuture {
137	/// Builds a new future for later resolution.
138	#[rustfmt::skip]
139	pub fn new() -> Self {
140		Self { state: Arc::new(Mutex::new(UtxoMessages {
141			complete: None,
142			channel_announce: None,
143			latest_node_announce_a: None,
144			latest_node_announce_b: None,
145			latest_channel_update_a: None,
146			latest_channel_update_b: None,
147		}))}
148	}
149
150	/// Resolves this future against the given `graph` and with the given `result`.
151	///
152	/// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
153	/// forwarding the validated gossip message onwards to peers.
154	///
155	/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
156	/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
157	/// after this.
158	///
159	/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
160	/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
161	pub fn resolve_without_forwarding<L: Deref>(
162		&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>,
163	) where
164		L::Target: Logger,
165	{
166		self.do_resolve(graph, result);
167	}
168
169	/// Resolves this future against the given `graph` and with the given `result`.
170	///
171	/// The given `gossip` is used to broadcast any validated messages onwards to all peers which
172	/// have available buffer space.
173	///
174	/// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
175	/// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
176	/// after this.
177	///
178	/// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
179	/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
180	pub fn resolve<
181		L: Deref,
182		G: Deref<Target = NetworkGraph<L>>,
183		U: Deref,
184		GS: Deref<Target = P2PGossipSync<G, U, L>>,
185	>(
186		&self, graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>,
187	) where
188		L::Target: Logger,
189		U::Target: UtxoLookup,
190	{
191		let mut res = self.do_resolve(graph, result);
192		for msg_opt in res.iter_mut() {
193			if let Some(msg) = msg_opt.take() {
194				gossip.forward_gossip_msg(msg);
195			}
196		}
197	}
198
199	#[rustfmt::skip]
200	fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
201	-> [Option<MessageSendEvent>; 5] where L::Target: Logger {
202		let (announcement, node_a, node_b, update_a, update_b) = {
203			let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
204			let mut async_messages = self.state.lock().unwrap();
205
206			if async_messages.channel_announce.is_none() {
207				// We raced returning to `check_channel_announcement` which hasn't updated
208				// `channel_announce` yet. That's okay, we can set the `complete` field which it will
209				// check once it gets control again.
210				async_messages.complete = Some(result);
211				return [None, None, None, None, None];
212			}
213
214			let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
215				ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
216				ChannelAnnouncement::Unsigned(msg) => &msg,
217			};
218
219			pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
220
221			(async_messages.channel_announce.take().unwrap(),
222				async_messages.latest_node_announce_a.take(),
223				async_messages.latest_node_announce_b.take(),
224				async_messages.latest_channel_update_a.take(),
225				async_messages.latest_channel_update_b.take())
226		};
227
228		let mut res = [None, None, None, None, None];
229		let mut res_idx = 0;
230
231		// Now that we've updated our internal state, pass the pending messages back through the
232		// network graph with a different `UtxoLookup` which will resolve immediately.
233		// Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do
234		// with them.
235		let resolver = UtxoResolver(result);
236		match announcement {
237			ChannelAnnouncement::Full(signed_msg) => {
238				if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
239					res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
240						msg: signed_msg, update_msg: None,
241					});
242					res_idx += 1;
243				}
244			},
245			ChannelAnnouncement::Unsigned(msg) => {
246				let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
247			},
248		}
249
250		for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
251			match announce {
252				Some(NodeAnnouncement::Full(signed_msg)) => {
253					if graph.update_node_from_announcement(&signed_msg).is_ok() {
254						res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
255							msg: signed_msg,
256						});
257						res_idx += 1;
258					}
259				},
260				Some(NodeAnnouncement::Unsigned(msg)) => {
261					let _ = graph.update_node_from_unsigned_announcement(&msg);
262				},
263				None => {},
264			}
265		}
266
267		for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
268			match update {
269				Some(ChannelUpdate::Full(signed_msg)) => {
270					if graph.update_channel(&signed_msg).is_ok() {
271						res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
272							msg: signed_msg,
273						});
274						res_idx += 1;
275					}
276				},
277				Some(ChannelUpdate::Unsigned(msg)) => {
278					let _ = graph.update_channel_unsigned(&msg);
279				},
280				None => {},
281			}
282		}
283
284		res
285	}
286}
287
288struct PendingChecksContext {
289	channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
290	nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
291}
292
293impl PendingChecksContext {
294	#[rustfmt::skip]
295	fn lookup_completed(&mut self,
296		msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
297	) {
298		if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
299			if Weak::ptr_eq(e.get(), &completed_state) {
300				e.remove();
301			}
302		}
303
304		if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
305			e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
306			if e.get().is_empty() { e.remove(); }
307		}
308		if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
309			e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
310			if e.get().is_empty() { e.remove(); }
311		}
312	}
313}
314
315/// A set of messages which are pending UTXO lookups for processing.
316pub(super) struct PendingChecks {
317	internal: Mutex<PendingChecksContext>,
318}
319
320impl PendingChecks {
321	#[rustfmt::skip]
322	pub(super) fn new() -> Self {
323		PendingChecks { internal: Mutex::new(PendingChecksContext {
324			channels: new_hash_map(), nodes: new_hash_map(),
325		}) }
326	}
327
328	/// Checks if there is a pending `channel_update` UTXO validation for the given channel,
329	/// and, if so, stores the channel message for handling later and returns an `Err`.
330	#[rustfmt::skip]
331	pub(super) fn check_hold_pending_channel_update(
332		&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
333	) -> Result<(), LightningError> {
334		let mut pending_checks = self.internal.lock().unwrap();
335		if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
336			let is_from_a = (msg.channel_flags & 1) == 1;
337			match Weak::upgrade(e.get()) {
338				Some(msgs_ref) => {
339					let mut messages = msgs_ref.lock().unwrap();
340					let latest_update = if is_from_a {
341							&mut messages.latest_channel_update_a
342						} else {
343							&mut messages.latest_channel_update_b
344						};
345					if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
346						// If the messages we got has a higher timestamp, just blindly assume the
347						// signatures on the new message are correct and drop the old message. This
348						// may cause us to end up dropping valid `channel_update`s if a peer is
349						// malicious, but we should get the correct ones when the node updates them.
350						*latest_update = Some(
351							if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
352							else { ChannelUpdate::Unsigned(msg.clone()) });
353					}
354					return Err(LightningError {
355						err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
356						action: ErrorAction::IgnoreAndLog(Level::Gossip),
357					});
358				},
359				None => { e.remove(); },
360			}
361		}
362		Ok(())
363	}
364
365	/// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
366	/// given node and, if so, stores the channel message for handling later and returns an `Err`.
367	#[rustfmt::skip]
368	pub(super) fn check_hold_pending_node_announcement(
369		&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
370	) -> Result<(), LightningError> {
371		let mut pending_checks = self.internal.lock().unwrap();
372		if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
373			let mut found_at_least_one_chan = false;
374			e.get_mut().retain(|node_msgs| {
375				match Weak::upgrade(&node_msgs) {
376					Some(chan_mtx) => {
377						let mut chan_msgs = chan_mtx.lock().unwrap();
378						if let Some(chan_announce) = &chan_msgs.channel_announce {
379							let latest_announce =
380								if *chan_announce.node_id_1() == msg.node_id {
381									&mut chan_msgs.latest_node_announce_a
382								} else {
383									&mut chan_msgs.latest_node_announce_b
384								};
385							if latest_announce.is_none() ||
386								latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
387							{
388								*latest_announce = Some(
389									if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
390									else { NodeAnnouncement::Unsigned(msg.clone()) });
391							}
392							found_at_least_one_chan = true;
393							true
394						} else {
395							debug_assert!(false, "channel_announce is set before struct is added to node map");
396							false
397						}
398					},
399					None => false,
400				}
401			});
402			if e.get().is_empty() { e.remove(); }
403			if found_at_least_one_chan {
404				return Err(LightningError {
405					err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
406					action: ErrorAction::IgnoreAndLog(Level::Gossip),
407				});
408			}
409		}
410		Ok(())
411	}
412
413	#[rustfmt::skip]
414	fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
415		full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
416		pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
417	) -> Result<(), msgs::LightningError> {
418		match pending_channels.entry(msg.short_channel_id) {
419			hash_map::Entry::Occupied(mut e) => {
420				// There's already a pending lookup for the given SCID. Check if the messages
421				// are the same and, if so, return immediately (don't bother spawning another
422				// lookup if we haven't gotten that far yet).
423				match Weak::upgrade(&e.get()) {
424					Some(pending_msgs) => {
425						// This may be called with the mutex held on a different UtxoMessages
426						// struct, however in that case we have a global lockorder of new messages
427						// -> old messages, which makes this safe.
428						let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
429							Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
430							Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
431							None => {
432								// This shouldn't actually be reachable. We set the
433								// `channel_announce` field under the same lock as setting the
434								// channel map entry. Still, we can just treat it as
435								// non-matching and let the new request fly.
436								debug_assert!(false);
437								false
438							},
439						};
440						if pending_matches {
441							return Err(LightningError {
442								err: "Channel announcement is already being checked".to_owned(),
443								action: ErrorAction::IgnoreDuplicateGossip,
444							});
445						} else {
446							// The earlier lookup is a different message. If we have another
447							// request in-flight now replace the original.
448							// Note that in the replace case whether to replace is somewhat
449							// arbitrary - both results will be handled, we're just updating the
450							// value that will be compared to future lookups with the same SCID.
451							if let Some(item) = replacement {
452								*e.get_mut() = item;
453							}
454						}
455					},
456					None => {
457						// The earlier lookup already resolved. We can't be sure its the same
458						// so just remove/replace it and move on.
459						if let Some(item) = replacement {
460							*e.get_mut() = item;
461						} else { e.remove(); }
462					},
463				}
464			},
465			hash_map::Entry::Vacant(v) => {
466				if let Some(item) = replacement { v.insert(item); }
467			},
468		}
469		Ok(())
470	}
471
472	#[rustfmt::skip]
473	pub(super) fn check_channel_announcement<U: Deref>(&self,
474		utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
475		full_msg: Option<&msgs::ChannelAnnouncement>
476	) -> Result<Option<Amount>, msgs::LightningError> where U::Target: UtxoLookup {
477		let handle_result = |res| {
478			match res {
479				Ok(TxOut { value, script_pubkey }) => {
480					let expected_script =
481						make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh();
482					if script_pubkey != expected_script {
483						return Err(LightningError{
484							err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
485								expected_script.to_hex_string(), script_pubkey.to_hex_string()),
486							action: ErrorAction::IgnoreError
487						});
488					}
489					Ok(Some(value))
490				},
491				Err(UtxoLookupError::UnknownChain) => {
492					Err(LightningError {
493						err: format!("Channel announced on an unknown chain ({})",
494							msg.chain_hash.to_bytes().as_hex()),
495						action: ErrorAction::IgnoreError
496					})
497				},
498				Err(UtxoLookupError::UnknownTx) => {
499					Err(LightningError {
500						err: "Channel announced without corresponding UTXO entry".to_owned(),
501						action: ErrorAction::IgnoreError
502					})
503				},
504			}
505		};
506
507		Self::check_replace_previous_entry(msg, full_msg, None,
508			&mut self.internal.lock().unwrap().channels)?;
509
510		match utxo_lookup {
511			&None => {
512				// Tentatively accept, potentially exposing us to DoS attacks
513				Ok(None)
514			},
515			&Some(ref utxo_lookup) => {
516				match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
517					UtxoResult::Sync(res) => handle_result(res),
518					UtxoResult::Async(future) => {
519						let mut pending_checks = self.internal.lock().unwrap();
520						let mut async_messages = future.state.lock().unwrap();
521						if let Some(res) = async_messages.complete.take() {
522							// In the unlikely event the future resolved before we managed to get it,
523							// handle the result in-line.
524							handle_result(res)
525						} else {
526							Self::check_replace_previous_entry(msg, full_msg,
527								Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
528							async_messages.channel_announce = Some(
529								if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
530								else { ChannelAnnouncement::Unsigned(msg.clone()) });
531							pending_checks.nodes.entry(msg.node_id_1)
532								.or_default().push(Arc::downgrade(&future.state));
533							pending_checks.nodes.entry(msg.node_id_2)
534								.or_default().push(Arc::downgrade(&future.state));
535							Err(LightningError {
536								err: "Channel being checked async".to_owned(),
537								action: ErrorAction::IgnoreAndLog(Level::Gossip),
538							})
539						}
540					},
541				}
542			}
543		}
544	}
545
546	/// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
547	/// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
548	/// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
549	/// which we'll have to process. With a socket buffer of 4KB and a minimum
550	/// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
551	/// count` messages to process beyond this limit. Because we'll probably have a few peers,
552	/// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
553	/// checks should be more than enough for decent parallelism.
554	const MAX_PENDING_LOOKUPS: usize = 32;
555
556	/// Returns true if there are a large number of async checks pending and future
557	/// `channel_announcement` messages should be delayed. Note that this is only a hint and
558	/// messages already in-flight may still have to be handled for various reasons.
559	#[rustfmt::skip]
560	pub(super) fn too_many_checks_pending(&self) -> bool {
561		let mut pending_checks = self.internal.lock().unwrap();
562		if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
563			// If we have many channel checks pending, ensure we don't have any dangling checks
564			// (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
565			// instead) before we commit to applying backpressure.
566			pending_checks.channels.retain(|_, chan| {
567				Weak::upgrade(&chan).is_some()
568			});
569			pending_checks.nodes.retain(|_, channels| {
570				channels.retain(|chan| Weak::upgrade(&chan).is_some());
571				!channels.is_empty()
572			});
573			pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
574		} else {
575			false
576		}
577	}
578}
579
580#[cfg(test)]
581mod tests {
582	use super::*;
583	use crate::routing::gossip::tests::*;
584	use crate::util::test_utils::{TestChainSource, TestLogger};
585
586	use bitcoin::amount::Amount;
587	use bitcoin::secp256k1::{Secp256k1, SecretKey};
588
589	use core::sync::atomic::Ordering;
590
591	fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
592		let logger = Box::new(TestLogger::new());
593		let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
594		let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
595
596		(chain_source, network_graph)
597	}
598
599	#[rustfmt::skip]
600	fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
601		NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
602		msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
603	{
604		let secp_ctx = Secp256k1::new();
605
606		let (chain_source, network_graph) = get_network();
607
608		let good_script = get_channel_script(&secp_ctx);
609		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
610		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
611		let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
612
613		let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
614		let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
615
616		// Note that we have to set the "direction" flag correctly on both messages
617		let chan_update_a = get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx);
618		let chan_update_b = get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx);
619		let chan_update_c = get_signed_channel_update(|msg| {
620			msg.channel_flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
621
622		(valid_announcement, chain_source, network_graph, good_script, node_a_announce,
623			node_b_announce, chan_update_a, chan_update_b, chan_update_c)
624	}
625
626	#[test]
627	#[rustfmt::skip]
628	fn test_fast_async_lookup() {
629		// Check that async lookups which resolve quicker than the future is returned to the
630		// `get_utxo` call can read it still resolve properly.
631		let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
632
633		let future = UtxoFuture::new();
634		future.resolve_without_forwarding(&network_graph,
635			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
636		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
637
638		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
639		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
640	}
641
642	#[test]
643	#[rustfmt::skip]
644	fn test_async_lookup() {
645		// Test a simple async lookup
646		let (valid_announcement, chain_source, network_graph, good_script,
647			node_a_announce, node_b_announce, ..) = get_test_objects();
648
649		let future = UtxoFuture::new();
650		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
651
652		assert_eq!(
653			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
654			"Channel being checked async");
655		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
656
657		future.resolve_without_forwarding(&network_graph,
658			Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script }));
659		network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
660		network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
661
662		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
663			.unwrap().announcement_info.is_none());
664
665		network_graph.update_node_from_announcement(&node_a_announce).unwrap();
666		network_graph.update_node_from_announcement(&node_b_announce).unwrap();
667
668		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
669			.unwrap().announcement_info.is_some());
670	}
671
672	#[test]
673	#[rustfmt::skip]
674	fn test_invalid_async_lookup() {
675		// Test an async lookup which returns an incorrect script
676		let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
677
678		let future = UtxoFuture::new();
679		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
680
681		assert_eq!(
682			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
683			"Channel being checked async");
684		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
685
686		future.resolve_without_forwarding(&network_graph,
687			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() }));
688		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
689	}
690
691	#[test]
692	#[rustfmt::skip]
693	fn test_failing_async_lookup() {
694		// Test an async lookup which returns an error
695		let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
696
697		let future = UtxoFuture::new();
698		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
699
700		assert_eq!(
701			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
702			"Channel being checked async");
703		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
704
705		future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
706		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
707	}
708
709	#[test]
710	#[rustfmt::skip]
711	fn test_updates_async_lookup() {
712		// Test async lookups will process pending channel_update/node_announcements once they
713		// complete.
714		let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
715			node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
716
717		let future = UtxoFuture::new();
718		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
719
720		assert_eq!(
721			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
722			"Channel being checked async");
723		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
724
725		assert_eq!(
726			network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
727			"Awaiting channel_announcement validation to accept node_announcement");
728		assert_eq!(
729			network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
730			"Awaiting channel_announcement validation to accept node_announcement");
731
732		assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
733			"Awaiting channel_announcement validation to accept channel_update");
734		assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
735			"Awaiting channel_announcement validation to accept channel_update");
736
737		future.resolve_without_forwarding(&network_graph,
738			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
739
740		assert!(network_graph.read_only().channels()
741			.get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
742		assert!(network_graph.read_only().channels()
743			.get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
744
745		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
746			.unwrap().announcement_info.is_some());
747		assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
748			.unwrap().announcement_info.is_some());
749	}
750
751	#[test]
752	#[rustfmt::skip]
753	fn test_latest_update_async_lookup() {
754		// Test async lookups will process the latest channel_update if two are received while
755		// awaiting an async UTXO lookup.
756		let (valid_announcement, chain_source, network_graph, good_script, _,
757			_, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
758
759		let future = UtxoFuture::new();
760		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
761
762		assert_eq!(
763			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
764			"Channel being checked async");
765		assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
766
767		assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
768			"Awaiting channel_announcement validation to accept channel_update");
769		assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
770			"Awaiting channel_announcement validation to accept channel_update");
771		assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
772			"Awaiting channel_announcement validation to accept channel_update");
773
774		future.resolve_without_forwarding(&network_graph,
775			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
776
777		assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
778		let graph_lock = network_graph.read_only();
779		assert!(graph_lock.channels()
780				.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
781				.one_to_two.as_ref().unwrap().last_update !=
782			graph_lock.channels()
783				.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
784				.two_to_one.as_ref().unwrap().last_update);
785	}
786
787	#[test]
788	#[rustfmt::skip]
789	fn test_no_double_lookups() {
790		// Test that a pending async lookup will prevent a second async lookup from flying, but
791		// only if the channel_announcement message is identical.
792		let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
793
794		let future = UtxoFuture::new();
795		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
796
797		assert_eq!(
798			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
799			"Channel being checked async");
800		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
801
802		// If we make a second request with the same message, the call count doesn't increase...
803		let future_b = UtxoFuture::new();
804		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
805		assert_eq!(
806			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
807			"Channel announcement is already being checked");
808		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
809
810		// But if we make a third request with a tweaked message, we should get a second call
811		// against our new future...
812		let secp_ctx = Secp256k1::new();
813		let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
814		let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
815		let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
816		assert_eq!(
817			network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
818			"Channel being checked async");
819		assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
820
821		// Still, if we resolve the original future, the original channel will be accepted.
822		future.resolve_without_forwarding(&network_graph,
823			Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
824		assert!(!network_graph.read_only().channels()
825			.get(&valid_announcement.contents.short_channel_id).unwrap()
826			.announcement_message.as_ref().unwrap()
827			.contents.features.supports_unknown_test_feature());
828	}
829
830	#[test]
831	#[rustfmt::skip]
832	fn test_checks_backpressure() {
833		// Test that too_many_checks_pending returns true when there are many checks pending, and
834		// returns false once they complete.
835		let secp_ctx = Secp256k1::new();
836		let (chain_source, network_graph) = get_network();
837
838		// We cheat and use a single future for all the lookups to complete them all at once.
839		let future = UtxoFuture::new();
840		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
841
842		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
843		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
844
845		for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
846			let valid_announcement = get_signed_channel_announcement(
847				|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
848			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
849			assert!(!network_graph.pending_checks.too_many_checks_pending());
850		}
851
852		let valid_announcement = get_signed_channel_announcement(
853			|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
854		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
855		assert!(network_graph.pending_checks.too_many_checks_pending());
856
857		// Once the future completes the "too many checks" flag should reset.
858		future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
859		assert!(!network_graph.pending_checks.too_many_checks_pending());
860	}
861
862	#[test]
863	#[rustfmt::skip]
864	fn test_checks_backpressure_drop() {
865		// Test that too_many_checks_pending returns true when there are many checks pending, and
866		// returns false if we drop some of the futures without completion.
867		let secp_ctx = Secp256k1::new();
868		let (chain_source, network_graph) = get_network();
869
870		// We cheat and use a single future for all the lookups to complete them all at once.
871		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
872
873		let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
874		let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
875
876		for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
877			let valid_announcement = get_signed_channel_announcement(
878				|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
879			network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
880			assert!(!network_graph.pending_checks.too_many_checks_pending());
881		}
882
883		let valid_announcement = get_signed_channel_announcement(
884			|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
885		network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
886		assert!(network_graph.pending_checks.too_many_checks_pending());
887
888		// Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
889		// should reset to false.
890		*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
891		assert!(!network_graph.pending_checks.too_many_checks_pending());
892	}
893}