use bitcoin::TxOut;
use bitcoin::blockdata::constants::ChainHash;
use hex::DisplayHex;
use crate::events::MessageSendEvent;
use crate::ln::chan_utils::make_funding_redeemscript_from_slices;
use crate::ln::msgs::{self, LightningError, ErrorAction};
use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use crate::util::logger::{Level, Logger};
use crate::prelude::*;
use alloc::sync::{Arc, Weak};
use crate::sync::{Mutex, LockTestExt};
use core::ops::Deref;
#[derive(Clone, Debug)]
pub enum UtxoLookupError {
UnknownChain,
UnknownTx,
}
#[derive(Clone)]
pub enum UtxoResult {
Sync(Result<TxOut, UtxoLookupError>),
Async(UtxoFuture),
}
pub trait UtxoLookup {
fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult;
}
enum ChannelAnnouncement {
Full(msgs::ChannelAnnouncement),
Unsigned(msgs::UnsignedChannelAnnouncement),
}
impl ChannelAnnouncement {
fn node_id_1(&self) -> &NodeId {
match self {
ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
}
}
}
enum NodeAnnouncement {
Full(msgs::NodeAnnouncement),
Unsigned(msgs::UnsignedNodeAnnouncement),
}
impl NodeAnnouncement {
fn timestamp(&self) -> u32 {
match self {
NodeAnnouncement::Full(msg) => msg.contents.timestamp,
NodeAnnouncement::Unsigned(msg) => msg.timestamp,
}
}
}
enum ChannelUpdate {
Full(msgs::ChannelUpdate),
Unsigned(msgs::UnsignedChannelUpdate),
}
impl ChannelUpdate {
fn timestamp(&self) -> u32 {
match self {
ChannelUpdate::Full(msg) => msg.contents.timestamp,
ChannelUpdate::Unsigned(msg) => msg.timestamp,
}
}
}
struct UtxoMessages {
complete: Option<Result<TxOut, UtxoLookupError>>,
channel_announce: Option<ChannelAnnouncement>,
latest_node_announce_a: Option<NodeAnnouncement>,
latest_node_announce_b: Option<NodeAnnouncement>,
latest_channel_update_a: Option<ChannelUpdate>,
latest_channel_update_b: Option<ChannelUpdate>,
}
#[derive(Clone)]
pub struct UtxoFuture {
state: Arc<Mutex<UtxoMessages>>,
}
pub(crate) struct UtxoResolver(Result<TxOut, UtxoLookupError>);
impl UtxoLookup for UtxoResolver {
fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
UtxoResult::Sync(self.0.clone())
}
}
impl UtxoFuture {
pub fn new() -> Self {
Self { state: Arc::new(Mutex::new(UtxoMessages {
complete: None,
channel_announce: None,
latest_node_announce_a: None,
latest_node_announce_b: None,
latest_channel_update_a: None,
latest_channel_update_b: None,
}))}
}
pub fn resolve_without_forwarding<L: Deref>(&self,
graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
where L::Target: Logger {
self.do_resolve(graph, result);
}
pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
) where L::Target: Logger, U::Target: UtxoLookup {
let mut res = self.do_resolve(graph, result);
for msg_opt in res.iter_mut() {
if let Some(msg) = msg_opt.take() {
gossip.forward_gossip_msg(msg);
}
}
}
fn do_resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
-> [Option<MessageSendEvent>; 5] where L::Target: Logger {
let (announcement, node_a, node_b, update_a, update_b) = {
let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
let mut async_messages = self.state.lock().unwrap();
if async_messages.channel_announce.is_none() {
async_messages.complete = Some(result);
return [None, None, None, None, None];
}
let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
ChannelAnnouncement::Unsigned(msg) => &msg,
};
pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
(async_messages.channel_announce.take().unwrap(),
async_messages.latest_node_announce_a.take(),
async_messages.latest_node_announce_b.take(),
async_messages.latest_channel_update_a.take(),
async_messages.latest_channel_update_b.take())
};
let mut res = [None, None, None, None, None];
let mut res_idx = 0;
let resolver = UtxoResolver(result);
match announcement {
ChannelAnnouncement::Full(signed_msg) => {
if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement {
msg: signed_msg, update_msg: None,
});
res_idx += 1;
}
},
ChannelAnnouncement::Unsigned(msg) => {
let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
},
}
for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
match announce {
Some(NodeAnnouncement::Full(signed_msg)) => {
if graph.update_node_from_announcement(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(NodeAnnouncement::Unsigned(msg)) => {
let _ = graph.update_node_from_unsigned_announcement(&msg);
},
None => {},
}
}
for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
match update {
Some(ChannelUpdate::Full(signed_msg)) => {
if graph.update_channel(&signed_msg).is_ok() {
res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate {
msg: signed_msg,
});
res_idx += 1;
}
},
Some(ChannelUpdate::Unsigned(msg)) => {
let _ = graph.update_channel_unsigned(&msg);
},
None => {},
}
}
res
}
}
struct PendingChecksContext {
channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
}
impl PendingChecksContext {
fn lookup_completed(&mut self,
msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
) {
if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
if Weak::ptr_eq(e.get(), &completed_state) {
e.remove();
}
}
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
if e.get().is_empty() { e.remove(); }
}
if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
if e.get().is_empty() { e.remove(); }
}
}
}
pub(super) struct PendingChecks {
internal: Mutex<PendingChecksContext>,
}
impl PendingChecks {
pub(super) fn new() -> Self {
PendingChecks { internal: Mutex::new(PendingChecksContext {
channels: new_hash_map(), nodes: new_hash_map(),
}) }
}
pub(super) fn check_hold_pending_channel_update(
&self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
) -> Result<(), LightningError> {
let mut pending_checks = self.internal.lock().unwrap();
if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
let is_from_a = (msg.flags & 1) == 1;
match Weak::upgrade(e.get()) {
Some(msgs_ref) => {
let mut messages = msgs_ref.lock().unwrap();
let latest_update = if is_from_a {
&mut messages.latest_channel_update_a
} else {
&mut messages.latest_channel_update_b
};
if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
*latest_update = Some(
if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
else { ChannelUpdate::Unsigned(msg.clone()) });
}
return Err(LightningError {
err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
});
},
None => { e.remove(); },
}
}
Ok(())
}
pub(super) fn check_hold_pending_node_announcement(
&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
) -> Result<(), LightningError> {
let mut pending_checks = self.internal.lock().unwrap();
if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
let mut found_at_least_one_chan = false;
e.get_mut().retain(|node_msgs| {
match Weak::upgrade(&node_msgs) {
Some(chan_mtx) => {
let mut chan_msgs = chan_mtx.lock().unwrap();
if let Some(chan_announce) = &chan_msgs.channel_announce {
let latest_announce =
if *chan_announce.node_id_1() == msg.node_id {
&mut chan_msgs.latest_node_announce_a
} else {
&mut chan_msgs.latest_node_announce_b
};
if latest_announce.is_none() ||
latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
{
*latest_announce = Some(
if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
else { NodeAnnouncement::Unsigned(msg.clone()) });
}
found_at_least_one_chan = true;
true
} else {
debug_assert!(false, "channel_announce is set before struct is added to node map");
false
}
},
None => false,
}
});
if e.get().is_empty() { e.remove(); }
if found_at_least_one_chan {
return Err(LightningError {
err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
});
}
}
Ok(())
}
fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
) -> Result<(), msgs::LightningError> {
match pending_channels.entry(msg.short_channel_id) {
hash_map::Entry::Occupied(mut e) => {
match Weak::upgrade(&e.get()) {
Some(pending_msgs) => {
let pending_matches = match &pending_msgs.unsafe_well_ordered_double_lock_self().channel_announce {
Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
None => {
debug_assert!(false);
false
},
};
if pending_matches {
return Err(LightningError {
err: "Channel announcement is already being checked".to_owned(),
action: ErrorAction::IgnoreDuplicateGossip,
});
} else {
if let Some(item) = replacement {
*e.get_mut() = item;
}
}
},
None => {
if let Some(item) = replacement {
*e.get_mut() = item;
} else { e.remove(); }
},
}
},
hash_map::Entry::Vacant(v) => {
if let Some(item) = replacement { v.insert(item); }
},
}
Ok(())
}
pub(super) fn check_channel_announcement<U: Deref>(&self,
utxo_lookup: &Option<U>, msg: &msgs::UnsignedChannelAnnouncement,
full_msg: Option<&msgs::ChannelAnnouncement>
) -> Result<Option<u64>, msgs::LightningError> where U::Target: UtxoLookup {
let handle_result = |res| {
match res {
Ok(TxOut { value, script_pubkey }) => {
let expected_script =
make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_array(), msg.bitcoin_key_2.as_array()).to_v0_p2wsh();
if script_pubkey != expected_script {
return Err(LightningError{
err: format!("Channel announcement key ({}) didn't match on-chain script ({})",
expected_script.to_hex_string(), script_pubkey.to_hex_string()),
action: ErrorAction::IgnoreError
});
}
Ok(Some(value))
},
Err(UtxoLookupError::UnknownChain) => {
Err(LightningError {
err: format!("Channel announced on an unknown chain ({})",
msg.chain_hash.to_bytes().as_hex()),
action: ErrorAction::IgnoreError
})
},
Err(UtxoLookupError::UnknownTx) => {
Err(LightningError {
err: "Channel announced without corresponding UTXO entry".to_owned(),
action: ErrorAction::IgnoreError
})
},
}
};
Self::check_replace_previous_entry(msg, full_msg, None,
&mut self.internal.lock().unwrap().channels)?;
match utxo_lookup {
&None => {
Ok(None)
},
&Some(ref utxo_lookup) => {
match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
UtxoResult::Sync(res) => handle_result(res),
UtxoResult::Async(future) => {
let mut pending_checks = self.internal.lock().unwrap();
let mut async_messages = future.state.lock().unwrap();
if let Some(res) = async_messages.complete.take() {
handle_result(res)
} else {
Self::check_replace_previous_entry(msg, full_msg,
Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
async_messages.channel_announce = Some(
if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
else { ChannelAnnouncement::Unsigned(msg.clone()) });
pending_checks.nodes.entry(msg.node_id_1)
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
pending_checks.nodes.entry(msg.node_id_2)
.or_insert(Vec::new()).push(Arc::downgrade(&future.state));
Err(LightningError {
err: "Channel being checked async".to_owned(),
action: ErrorAction::IgnoreAndLog(Level::Gossip),
})
}
},
}
}
}
}
const MAX_PENDING_LOOKUPS: usize = 32;
pub(super) fn too_many_checks_pending(&self) -> bool {
let mut pending_checks = self.internal.lock().unwrap();
if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
pending_checks.channels.retain(|_, chan| {
Weak::upgrade(&chan).is_some()
});
pending_checks.nodes.retain(|_, channels| {
channels.retain(|chan| Weak::upgrade(&chan).is_some());
!channels.is_empty()
});
pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
} else {
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::routing::gossip::tests::*;
use crate::util::test_utils::{TestChainSource, TestLogger};
use bitcoin::secp256k1::{Secp256k1, SecretKey};
use core::sync::atomic::Ordering;
fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
let logger = Box::new(TestLogger::new());
let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
let network_graph = NetworkGraph::new(bitcoin::Network::Testnet, logger);
(chain_source, network_graph)
}
fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
NetworkGraph<Box<TestLogger>>, bitcoin::ScriptBuf, msgs::NodeAnnouncement,
msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
{
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
let good_script = get_channel_script(&secp_ctx);
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
let chan_update_c = get_signed_channel_update(|msg| {
msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
(valid_announcement, chain_source, network_graph, good_script, node_a_announce,
node_b_announce, chan_update_a, chan_update_b, chan_update_c)
}
#[test]
fn test_fast_async_lookup() {
let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
let future = UtxoFuture::new();
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
}
#[test]
fn test_async_lookup() {
let (valid_announcement, chain_source, network_graph, good_script,
node_a_announce, node_b_announce, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 0, script_pubkey: good_script }));
network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_none());
network_graph.update_node_from_announcement(&node_a_announce).unwrap();
network_graph.update_node_from_announcement(&node_b_announce).unwrap();
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_some());
}
#[test]
fn test_invalid_async_lookup() {
let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::ScriptBuf::new() }));
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
}
#[test]
fn test_failing_async_lookup() {
let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
}
#[test]
fn test_updates_async_lookup() {
let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
assert_eq!(
network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
"Awaiting channel_announcement validation to accept node_announcement");
assert_eq!(
network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
"Awaiting channel_announcement validation to accept node_announcement");
assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert!(network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
assert!(network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
.unwrap().announcement_info.is_some());
assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
.unwrap().announcement_info.is_some());
}
#[test]
fn test_latest_update_async_lookup() {
let (valid_announcement, chain_source, network_graph, good_script, _,
_, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
"Awaiting channel_announcement validation to accept channel_update");
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
let graph_lock = network_graph.read_only();
assert!(graph_lock.channels()
.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
.one_to_two.as_ref().unwrap().last_update !=
graph_lock.channels()
.get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
.two_to_one.as_ref().unwrap().last_update);
}
#[test]
fn test_no_double_lookups() {
let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
let future_b = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
assert_eq!(
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel announcement is already being checked");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
let secp_ctx = Secp256k1::new();
let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
assert_eq!(
network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
"Channel being checked async");
assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
future.resolve_without_forwarding(&network_graph,
Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
assert!(!network_graph.read_only().channels()
.get(&valid_announcement.contents.short_channel_id).unwrap()
.announcement_message.as_ref().unwrap()
.contents.features.supports_unknown_test_feature());
}
#[test]
fn test_checks_backpressure() {
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
let future = UtxoFuture::new();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
let valid_announcement = get_signed_channel_announcement(
|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
let valid_announcement = get_signed_channel_announcement(
|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(network_graph.pending_checks.too_many_checks_pending());
future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
#[test]
fn test_checks_backpressure_drop() {
let secp_ctx = Secp256k1::new();
let (chain_source, network_graph) = get_network();
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
let valid_announcement = get_signed_channel_announcement(
|msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
let valid_announcement = get_signed_channel_announcement(
|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
assert!(network_graph.pending_checks.too_many_checks_pending());
*chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
assert!(!network_graph.pending_checks.too_many_checks_pending());
}
}