1use bitcoin::amount::Amount;
17use bitcoin::constants::ChainHash;
18use bitcoin::TxOut;
19
20use bitcoin::hex::DisplayHex;
21
22use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
23use crate::ln::msgs::{self, ErrorAction, LightningError, MessageSendEvent};
24use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
25use crate::util::logger::{Level, Logger};
26
27use crate::prelude::*;
28
29use crate::sync::{LockTestExt, Mutex};
30use alloc::sync::{Arc, Weak};
31use core::ops::Deref;
32
33#[derive(Clone, Debug)]
35pub enum UtxoLookupError {
36 UnknownChain,
38
39 UnknownTx,
41}
42
43#[derive(Clone)]
47pub enum UtxoResult {
48 Sync(Result<TxOut, UtxoLookupError>),
51 Async(UtxoFuture),
59}
60
61pub trait UtxoLookup {
63 fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
69}
70
71enum ChannelAnnouncement {
72 Full(msgs::ChannelAnnouncement),
73 Unsigned(msgs::UnsignedChannelAnnouncement),
74}
75impl ChannelAnnouncement {
76 fn node_id_1(&self) -> &NodeId {
77 match self {
78 ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
79 ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
80 }
81 }
82}
83
84enum NodeAnnouncement {
85 Full(msgs::NodeAnnouncement),
86 Unsigned(msgs::UnsignedNodeAnnouncement),
87}
88impl NodeAnnouncement {
89 fn timestamp(&self) -> u32 {
90 match self {
91 NodeAnnouncement::Full(msg) => msg.contents.timestamp,
92 NodeAnnouncement::Unsigned(msg) => msg.timestamp,
93 }
94 }
95}
96
97enum ChannelUpdate {
98 Full(msgs::ChannelUpdate),
99 Unsigned(msgs::UnsignedChannelUpdate),
100}
101impl ChannelUpdate {
102 fn timestamp(&self) -> u32 {
103 match self {
104 ChannelUpdate::Full(msg) => msg.contents.timestamp,
105 ChannelUpdate::Unsigned(msg) => msg.timestamp,
106 }
107 }
108}
109
110struct UtxoMessages {
111 complete: Option<Result<TxOut, UtxoLookupError>>,
112 channel_announce: Option<ChannelAnnouncement>,
113 latest_node_announce_a: Option<NodeAnnouncement>,
114 latest_node_announce_b: Option<NodeAnnouncement>,
115 latest_channel_update_a: Option<ChannelUpdate>,
116 latest_channel_update_b: Option<ChannelUpdate>,
117}
118
119#[derive(Clone)]
123pub struct UtxoFuture {
124 state: Arc<Mutex<UtxoMessages>>,
125}
126
127pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
130impl UtxoLookup for UtxoResolver {
131 fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
132 UtxoResult::Sync(self.0.clone())
133 }
134}
135
136impl UtxoFuture {
137 #[rustfmt::skip]
139 pub fn new() -> Self {
140 Self { state: Arc::new(Mutex::new(UtxoMessages {
141 complete: None,
142 channel_announce: None,
143 latest_node_announce_a: None,
144 latest_node_announce_b: None,
145 latest_channel_update_a: None,
146 latest_channel_update_b: None,
147 }))}
148 }
149
150 pub fn resolve_without_forwarding<L: Deref>(
162 &self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>,
163 ) where
164 L::Target: Logger,
165 {
166 self.do_resolve(graph, result);
167 }
168
169 pub fn resolve<
181 L: Deref,
182 G: Deref<Target = NetworkGraph<L>>,
183 U: Deref,
184 GS: Deref<Target = P2PGossipSync<G, U, L>>,
185 >(
186 &self, graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>,
187 ) where
188 L::Target: Logger,
189 U::Target: UtxoLookup,
190 {
191 let mut res = self.do_resolve(graph, result);
192 for msg_opt in res.iter_mut() {
193 if let Some(msg) = msg_opt.take() {
194 gossip.forward_gossip_msg(msg);
195 }
196 }
197 }
198
199 #[rustfmt::skip]
200 fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
201 -> [Option<MessageSendEvent>; 5] where L::Target: Logger {
202 let (announcement, node_a, node_b, update_a, update_b) = {
203 let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
204 let mut async_messages = self.state.lock().unwrap();
205
206 if async_messages.channel_announce.is_none() {
207 async_messages.complete = Some(result);
211 return [None, None, None, None, None];
212 }
213
214 let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
215 ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
216 ChannelAnnouncement::Unsigned(msg) => &msg,
217 };
218
219 pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
220
221 (async_messages.channel_announce.take().unwrap(),
222 async_messages.latest_node_announce_a.take(),
223 async_messages.latest_node_announce_b.take(),
224 async_messages.latest_channel_update_a.take(),
225 async_messages.latest_channel_update_b.take())
226 };
227
228 let mut res = [None, None, None, None, None];
229 let mut res_idx = 0;
230
231 let resolver = UtxoResolver(result);
236 match announcement {
237 ChannelAnnouncement::Full(signed_msg) => {
238 if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
239 res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
240 msg: signed_msg, update_msg: None,
241 });
242 res_idx += 1;
243 }
244 },
245 ChannelAnnouncement::Unsigned(msg) => {
246 let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
247 },
248 }
249
250 for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
251 match announce {
252 Some(NodeAnnouncement::Full(signed_msg)) => {
253 if graph.update_node_from_announcement(&signed_msg).is_ok() {
254 res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
255 msg: signed_msg,
256 });
257 res_idx += 1;
258 }
259 },
260 Some(NodeAnnouncement::Unsigned(msg)) => {
261 let _ = graph.update_node_from_unsigned_announcement(&msg);
262 },
263 None => {},
264 }
265 }
266
267 for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
268 match update {
269 Some(ChannelUpdate::Full(signed_msg)) => {
270 if graph.update_channel(&signed_msg).is_ok() {
271 res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
272 msg: signed_msg,
273 });
274 res_idx += 1;
275 }
276 },
277 Some(ChannelUpdate::Unsigned(msg)) => {
278 let _ = graph.update_channel_unsigned(&msg);
279 },
280 None => {},
281 }
282 }
283
284 res
285 }
286}
287
288struct PendingChecksContext {
289 channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
290 nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
291}
292
293impl PendingChecksContext {
294 #[rustfmt::skip]
295 fn lookup_completed(&mut self,
296 msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
297 ) {
298 if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
299 if Weak::ptr_eq(e.get(), &completed_state) {
300 e.remove();
301 }
302 }
303
304 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
305 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
306 if e.get().is_empty() { e.remove(); }
307 }
308 if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
309 e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
310 if e.get().is_empty() { e.remove(); }
311 }
312 }
313}
314
315pub(super) struct PendingChecks {
317 internal: Mutex<PendingChecksContext>,
318}
319
320impl PendingChecks {
321 #[rustfmt::skip]
322 pub(super) fn new() -> Self {
323 PendingChecks { internal: Mutex::new(PendingChecksContext {
324 channels: new_hash_map(), nodes: new_hash_map(),
325 }) }
326 }
327
328 #[rustfmt::skip]
331 pub(super) fn check_hold_pending_channel_update(
332 &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
333 ) -> Result<(), LightningError> {
334 let mut pending_checks = self.internal.lock().unwrap();
335 if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
336 let is_from_a = (msg.channel_flags & 1) == 1;
337 match Weak::upgrade(e.get()) {
338 Some(msgs_ref) => {
339 let mut messages = msgs_ref.lock().unwrap();
340 let latest_update = if is_from_a {
341 &mut messages.latest_channel_update_a
342 } else {
343 &mut messages.latest_channel_update_b
344 };
345 if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
346 *latest_update = Some(
351 if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
352 else { ChannelUpdate::Unsigned(msg.clone()) });
353 }
354 return Err(LightningError {
355 err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
356 action: ErrorAction::IgnoreAndLog(Level::Gossip),
357 });
358 },
359 None => { e.remove(); },
360 }
361 }
362 Ok(())
363 }
364
365 #[rustfmt::skip]
368 pub(super) fn check_hold_pending_node_announcement(
369 &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
370 ) -> Result<(), LightningError> {
371 let mut pending_checks = self.internal.lock().unwrap();
372 if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
373 let mut found_at_least_one_chan = false;
374 e.get_mut().retain(|node_msgs| {
375 match Weak::upgrade(&node_msgs) {
376 Some(chan_mtx) => {
377 let mut chan_msgs = chan_mtx.lock().unwrap();
378 if let Some(chan_announce) = &chan_msgs.channel_announce {
379 let latest_announce =
380 if *chan_announce.node_id_1() == msg.node_id {
381 &mut chan_msgs.latest_node_announce_a
382 } else {
383 &mut chan_msgs.latest_node_announce_b
384 };
385 if latest_announce.is_none() ||
386 latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
387 {
388 *latest_announce = Some(
389 if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
390 else { NodeAnnouncement::Unsigned(msg.clone()) });
391 }
392 found_at_least_one_chan = true;
393 true
394 } else {
395 debug_assert!(false, "channel_announce is set before struct is added to node map");
396 false
397 }
398 },
399 None => false,
400 }
401 });
402 if e.get().is_empty() { e.remove(); }
403 if found_at_least_one_chan {
404 return Err(LightningError {
405 err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
406 action: ErrorAction::IgnoreAndLog(Level::Gossip),
407 });
408 }
409 }
410 Ok(())
411 }
412
413 #[rustfmt::skip]
414 fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
415 full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
416 pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
417 ) -> Result<(), msgs::LightningError> {
418 match pending_channels.entry(msg.short_channel_id) {
419 hash_map::Entry::Occupied(mut e) => {
420 match Weak::upgrade(&e.get()) {
424 Some(pending_msgs) => {
425 let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
429 Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
430 Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
431 None => {
432 debug_assert!(false);
437 false
438 },
439 };
440 if pending_matches {
441 return Err(LightningError {
442 err: "Channel announcement is already being checked".to_owned(),
443 action: ErrorAction::IgnoreDuplicateGossip,
444 });
445 } else {
446 if let Some(item) = replacement {
452 *e.get_mut() = item;
453 }
454 }
455 },
456 None => {
457 if let Some(item) = replacement {
460 *e.get_mut() = item;
461 } else { e.remove(); }
462 },
463 }
464 },
465 hash_map::Entry::Vacant(v) => {
466 if let Some(item) = replacement { v.insert(item); }
467 },
468 }
469 Ok(())
470 }
471
472 #[rustfmt::skip]
473 pub(super) fn check_channel_announcement<U: Deref>(&self,
474 utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
475 full_msg: Option<&msgs::ChannelAnnouncement>
476 ) -> Result<Option<Amount>, msgs::LightningError> where U::Target: UtxoLookup {
477 let handle_result = |res| {
478 match res {
479 Ok(TxOut { value, script_pubkey }) => {
480 let expected_script =
481 make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_p2wsh();
482 if script_pubkey != expected_script {
483 return Err(LightningError{
484 err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
485 expected_script.to_hex_string(), script_pubkey.to_hex_string()),
486 action: ErrorAction::IgnoreError
487 });
488 }
489 Ok(Some(value))
490 },
491 Err(UtxoLookupError::UnknownChain) => {
492 Err(LightningError {
493 err: format!("Channel announced on an unknown chain ({})",
494 msg.chain_hash.to_bytes().as_hex()),
495 action: ErrorAction::IgnoreError
496 })
497 },
498 Err(UtxoLookupError::UnknownTx) => {
499 Err(LightningError {
500 err: "Channel announced without corresponding UTXO entry".to_owned(),
501 action: ErrorAction::IgnoreError
502 })
503 },
504 }
505 };
506
507 Self::check_replace_previous_entry(msg, full_msg, None,
508 &mut self.internal.lock().unwrap().channels)?;
509
510 match utxo_lookup {
511 &None => {
512 Ok(None)
514 },
515 &Some(ref utxo_lookup) => {
516 match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
517 UtxoResult::Sync(res) => handle_result(res),
518 UtxoResult::Async(future) => {
519 let mut pending_checks = self.internal.lock().unwrap();
520 let mut async_messages = future.state.lock().unwrap();
521 if let Some(res) = async_messages.complete.take() {
522 handle_result(res)
525 } else {
526 Self::check_replace_previous_entry(msg, full_msg,
527 Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
528 async_messages.channel_announce = Some(
529 if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
530 else { ChannelAnnouncement::Unsigned(msg.clone()) });
531 pending_checks.nodes.entry(msg.node_id_1)
532 .or_default().push(Arc::downgrade(&future.state));
533 pending_checks.nodes.entry(msg.node_id_2)
534 .or_default().push(Arc::downgrade(&future.state));
535 Err(LightningError {
536 err: "Channel being checked async".to_owned(),
537 action: ErrorAction::IgnoreAndLog(Level::Gossip),
538 })
539 }
540 },
541 }
542 }
543 }
544 }
545
546 const MAX_PENDING_LOOKUPS: usize = 32;
555
556 #[rustfmt::skip]
560 pub(super) fn too_many_checks_pending(&self) -> bool {
561 let mut pending_checks = self.internal.lock().unwrap();
562 if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
563 pending_checks.channels.retain(|_, chan| {
567 Weak::upgrade(&chan).is_some()
568 });
569 pending_checks.nodes.retain(|_, channels| {
570 channels.retain(|chan| Weak::upgrade(&chan).is_some());
571 !channels.is_empty()
572 });
573 pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
574 } else {
575 false
576 }
577 }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::routing::gossip::tests::*;
584 use crate::util::test_utils::{TestChainSource, TestLogger};
585
586 use bitcoin::amount::Amount;
587 use bitcoin::secp256k1::{Secp256k1, SecretKey};
588
589 use core::sync::atomic::Ordering;
590
591 fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
592 let logger = Box::new(TestLogger::new());
593 let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
594 let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
595
596 (chain_source, network_graph)
597 }
598
599 #[rustfmt::skip]
600 fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
601 NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
602 msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
603 {
604 let secp_ctx = Secp256k1::new();
605
606 let (chain_source, network_graph) = get_network();
607
608 let good_script = get_channel_script(&secp_ctx);
609 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
610 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
611 let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
612
613 let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
614 let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
615
616 let chan_update_a = get_signed_channel_update(|msg| msg.channel_flags = 0, node_1_privkey, &secp_ctx);
618 let chan_update_b = get_signed_channel_update(|msg| msg.channel_flags = 1, node_2_privkey, &secp_ctx);
619 let chan_update_c = get_signed_channel_update(|msg| {
620 msg.channel_flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
621
622 (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
623 node_b_announce, chan_update_a, chan_update_b, chan_update_c)
624 }
625
626 #[test]
627 #[rustfmt::skip]
628 fn test_fast_async_lookup() {
629 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
632
633 let future = UtxoFuture::new();
634 future.resolve_without_forwarding(&network_graph,
635 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
636 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
637
638 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
639 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
640 }
641
642 #[test]
643 #[rustfmt::skip]
644 fn test_async_lookup() {
645 let (valid_announcement, chain_source, network_graph, good_script,
647 node_a_announce, node_b_announce, ..) = get_test_objects();
648
649 let future = UtxoFuture::new();
650 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
651
652 assert_eq!(
653 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
654 "Channel being checked async");
655 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
656
657 future.resolve_without_forwarding(&network_graph,
658 Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script }));
659 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
660 network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
661
662 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
663 .unwrap().announcement_info.is_none());
664
665 network_graph.update_node_from_announcement(&node_a_announce).unwrap();
666 network_graph.update_node_from_announcement(&node_b_announce).unwrap();
667
668 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
669 .unwrap().announcement_info.is_some());
670 }
671
672 #[test]
673 #[rustfmt::skip]
674 fn test_invalid_async_lookup() {
675 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
677
678 let future = UtxoFuture::new();
679 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
680
681 assert_eq!(
682 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
683 "Channel being checked async");
684 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
685
686 future.resolve_without_forwarding(&network_graph,
687 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() }));
688 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
689 }
690
691 #[test]
692 #[rustfmt::skip]
693 fn test_failing_async_lookup() {
694 let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
696
697 let future = UtxoFuture::new();
698 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
699
700 assert_eq!(
701 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
702 "Channel being checked async");
703 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
704
705 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
706 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
707 }
708
709 #[test]
710 #[rustfmt::skip]
711 fn test_updates_async_lookup() {
712 let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
715 node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
716
717 let future = UtxoFuture::new();
718 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
719
720 assert_eq!(
721 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
722 "Channel being checked async");
723 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
724
725 assert_eq!(
726 network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
727 "Awaiting channel_announcement validation to accept node_announcement");
728 assert_eq!(
729 network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
730 "Awaiting channel_announcement validation to accept node_announcement");
731
732 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
733 "Awaiting channel_announcement validation to accept channel_update");
734 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
735 "Awaiting channel_announcement validation to accept channel_update");
736
737 future.resolve_without_forwarding(&network_graph,
738 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
739
740 assert!(network_graph.read_only().channels()
741 .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
742 assert!(network_graph.read_only().channels()
743 .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
744
745 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
746 .unwrap().announcement_info.is_some());
747 assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
748 .unwrap().announcement_info.is_some());
749 }
750
751 #[test]
752 #[rustfmt::skip]
753 fn test_latest_update_async_lookup() {
754 let (valid_announcement, chain_source, network_graph, good_script, _,
757 _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
758
759 let future = UtxoFuture::new();
760 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
761
762 assert_eq!(
763 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
764 "Channel being checked async");
765 assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
766
767 assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
768 "Awaiting channel_announcement validation to accept channel_update");
769 assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
770 "Awaiting channel_announcement validation to accept channel_update");
771 assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
772 "Awaiting channel_announcement validation to accept channel_update");
773
774 future.resolve_without_forwarding(&network_graph,
775 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
776
777 assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
778 let graph_lock = network_graph.read_only();
779 assert!(graph_lock.channels()
780 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
781 .one_to_two.as_ref().unwrap().last_update !=
782 graph_lock.channels()
783 .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
784 .two_to_one.as_ref().unwrap().last_update);
785 }
786
787 #[test]
788 #[rustfmt::skip]
789 fn test_no_double_lookups() {
790 let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
793
794 let future = UtxoFuture::new();
795 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
796
797 assert_eq!(
798 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
799 "Channel being checked async");
800 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
801
802 let future_b = UtxoFuture::new();
804 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
805 assert_eq!(
806 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
807 "Channel announcement is already being checked");
808 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
809
810 let secp_ctx = Secp256k1::new();
813 let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
814 let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
815 let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
816 assert_eq!(
817 network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
818 "Channel being checked async");
819 assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
820
821 future.resolve_without_forwarding(&network_graph,
823 Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script }));
824 assert!(!network_graph.read_only().channels()
825 .get(&valid_announcement.contents.short_channel_id).unwrap()
826 .announcement_message.as_ref().unwrap()
827 .contents.features.supports_unknown_test_feature());
828 }
829
830 #[test]
831 #[rustfmt::skip]
832 fn test_checks_backpressure() {
833 let secp_ctx = Secp256k1::new();
836 let (chain_source, network_graph) = get_network();
837
838 let future = UtxoFuture::new();
840 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
841
842 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
843 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
844
845 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
846 let valid_announcement = get_signed_channel_announcement(
847 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
848 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
849 assert!(!network_graph.pending_checks.too_many_checks_pending());
850 }
851
852 let valid_announcement = get_signed_channel_announcement(
853 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
854 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
855 assert!(network_graph.pending_checks.too_many_checks_pending());
856
857 future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
859 assert!(!network_graph.pending_checks.too_many_checks_pending());
860 }
861
862 #[test]
863 #[rustfmt::skip]
864 fn test_checks_backpressure_drop() {
865 let secp_ctx = Secp256k1::new();
868 let (chain_source, network_graph) = get_network();
869
870 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
872
873 let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
874 let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
875
876 for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
877 let valid_announcement = get_signed_channel_announcement(
878 |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
879 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
880 assert!(!network_graph.pending_checks.too_many_checks_pending());
881 }
882
883 let valid_announcement = get_signed_channel_announcement(
884 |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
885 network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
886 assert!(network_graph.pending_checks.too_many_checks_pending());
887
888 *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
891 assert!(!network_graph.pending_checks.too_many_checks_pending());
892 }
893}