#![cfg(test)]
pub mod peer;
mod simulations;
use std::io;
use std::iter;
use std::net;
use std::ops::Bound;
use std::sync::Arc;
use log::*;
use nakamoto_common::bitcoin::network::message_blockdata::GetHeadersMessage;
use super::{addrmgr, cbfmgr, invmgr, peermgr, pingmgr, syncmgr};
use super::{
chan, network::Network, BlockHash, BlockHeader, Command, Config, DisconnectReason, Event,
HashSet, Height, Io, Limits, NetworkMessage, PeerId, RawNetworkMessage, ServiceFlags,
VersionMessage,
};
use super::{PROTOCOL_VERSION, USER_AGENT};
use peer::{Peer, PeerDummy};
use nakamoto_common::bitcoin::network::message_blockdata::Inventory;
use nakamoto_common::bitcoin::network::message_filter::CFilter;
use nakamoto_common::bitcoin::network::message_filter::{CFHeaders, GetCFHeaders, GetCFilters};
use nakamoto_common::bitcoin::network::Address;
use nakamoto_common::bitcoin_hashes::hex::FromHex;
use nakamoto_common::block::time::Clock as _;
use nakamoto_net::simulator::{Options, Peer as _, Simulation};
use nakamoto_net::{Link, LocalDuration, LocalTime, StateMachine as _};
use quickcheck_macros::quickcheck;
use nakamoto_chain::block::cache::BlockCache;
use nakamoto_chain::block::store;
use nakamoto_chain::store::Genesis;
use nakamoto_common::block::filter::FilterHeader;
use nakamoto_common::block::time::{AdjustedTime, RefClock};
use nakamoto_common::block::tree::BlockReader as _;
use nakamoto_common::collections::HashMap;
use nakamoto_common::nonempty::NonEmpty;
use nakamoto_common::p2p::peer::KnownAddress;
use nakamoto_common::p2p::peer::Source;
use nakamoto_test::arbitrary;
use nakamoto_test::assert_matches;
use nakamoto_test::block::cache::model;
use nakamoto_test::block::gen;
use nakamoto_test::BITCOIN_HEADERS;
#[allow(unused_imports)]
use nakamoto_test::logger;
pub type Protocol = super::StateMachine<
BlockCache<store::Memory<BlockHeader>>,
model::FilterCache,
HashMap<net::IpAddr, KnownAddress>,
RefClock<AdjustedTime<PeerId>>,
>;
mod setup {
use super::*;
#[allow(dead_code)]
pub fn addresses(count: usize, rng: fastrand::Rng) -> Vec<net::SocketAddr> {
let mut addrs = Vec::with_capacity(count);
while addrs.len() < count {
let addr: net::SocketAddr = (
[rng.u8(..), rng.u8(..), rng.u8(..), rng.u8(..)],
rng.u16(1024..),
)
.into();
if !addrmgr::is_routable(&addr.ip()) {
continue;
}
if addrs.iter().any(|a| addr == *a) {
continue;
}
addrs.push(addr);
}
addrs
}
}
#[test]
fn test_handshake() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let outbound = ([241, 19, 44, 19], 8333).into();
let inbound = ([241, 19, 44, 18], 8333).into();
peer.connect_addr(&inbound, Link::Inbound);
peer.connect_addr(&outbound, Link::Outbound);
}
#[test]
fn test_initial_sync() {
let rng = fastrand::Rng::new();
let height = 144;
let network = Network::Mainnet;
let headers = BITCOIN_HEADERS.tail[0..height].to_vec();
let time = LocalTime::from_block_time(headers.last().unwrap().time);
assert!(headers.len() >= height);
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng.clone());
let mut bob = Peer::new(
"bob",
[97, 97, 97, 97],
network,
headers,
vec![],
vec![],
rng.clone(),
);
bob.protocol.syncmgr.config.max_message_headers = 10;
assert_eq!(bob.protocol.tree.height(), height as Height);
alice.command(Command::Connect(bob.addr));
let mut simulation =
Simulation::new(time, rng, Options::default()).initialize([&mut alice, &mut bob]);
while simulation.step([&mut alice, &mut bob]) {
if alice.protocol.tree.height() == height as Height {
break;
}
}
}
#[test]
fn test_idle_disconnect() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote = ([241, 19, 44, 18], 8333).into();
peer.connect_addr(&remote, Link::Outbound);
peer.elapse(pingmgr::PING_INTERVAL);
peer.messages(&remote)
.find(|o| matches!(o, NetworkMessage::Ping(_)))
.expect("`ping` is sent");
peer.elapse(pingmgr::PING_TIMEOUT);
peer.outputs()
.find(|o| matches!(o, Io::Disconnect(addr, DisconnectReason::PeerTimeout("ping")) if addr == &remote))
.expect("peer disconnects remote");
}
#[test]
fn test_inv_getheaders() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote: PeerId = ([241, 19, 44, 18], 8333).into();
let hash =
BlockHash::from_hex("0000000000b7b2c71f2a345e3a4fc328bf5bbb436012afca590b1a11466e2206")
.unwrap();
peer.connect_addr(&remote, Link::Outbound);
peer.received(&remote, NetworkMessage::Inv(vec![Inventory::Block(hash)]));
peer.messages(&remote)
.find(|o| matches!(o, NetworkMessage::GetHeaders(_)))
.expect("a `getheaders` message should be returned");
peer.outputs()
.find(|o| matches!(o, Io::SetTimer(_)))
.expect("a timer should be returned");
}
#[test]
fn test_bad_magic() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote: PeerId = ([241, 19, 44, 18], 8333).into();
peer.connect_addr(&remote, Link::Outbound);
peer.received_raw(
&remote,
RawNetworkMessage {
magic: 999,
payload: NetworkMessage::Ping(1),
},
);
peer.outputs()
.find(|o| matches!(o, Io::Disconnect(addr, DisconnectReason::PeerMagic(999)) if addr == &remote))
.expect("peer should be disconnected");
}
#[test]
fn test_maintain_connections() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let port = network.port();
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let time = alice.local_time().block_time();
let peers: Vec<PeerId> = vec![
([88, 88, 88, 1], 8333).into(),
([88, 88, 88, 2], 8333).into(),
([88, 88, 88, 3], 8333).into(),
];
let mut addrs: HashSet<_> = vec![
([77, 77, 77, 77], port).into(),
([78, 78, 78, 78], port).into(),
([79, 79, 79, 79], port).into(),
]
.into_iter()
.collect();
for peer in peers.iter() {
alice.connect_addr(peer, Link::Outbound);
alice.protocol.peermgr.is_connected(peer);
}
for addr in addrs.iter() {
let addr = Address::new(addr, alice.protocol.peermgr.config.required_services);
alice
.protocol
.addrmgr
.insert(vec![(time, addr)].into_iter(), Source::Dns);
}
for peer in peers.iter() {
alice.disconnected(peer, DisconnectReason::PeerTimeout("timeout").into());
let addr = alice
.outputs()
.find_map(|o| match o {
Io::Connect(addr) => Some(addr),
_ => None,
})
.expect("Alice connects to a new peer");
assert!(addr != *peer);
assert!(addrs.remove(&addr));
}
assert!(addrs.is_empty());
}
#[test]
fn test_getheaders_retry() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let hash =
BlockHash::from_hex("0000000000b7b2c71f2a345e3a4fc328bf5bbb436012afca590b1a11466e2206")
.unwrap();
let mut alice = Peer::genesis("alice", [49, 40, 43, 40], network, vec![], rng);
let peers = [
([55, 55, 55, 55], network.port()).into(),
([66, 66, 66, 66], network.port()).into(),
([77, 77, 77, 77], network.port()).into(),
];
for peer in peers.iter() {
alice.connect(
&PeerDummy {
addr: *peer,
height: 0, protocol_version: PROTOCOL_VERSION,
services: syncmgr::REQUIRED_SERVICES,
relay: true,
time: alice.local_time(),
},
Link::Outbound,
);
}
assert_eq!(
alice.protocol.syncmgr.best_height().unwrap(),
alice.protocol.tree.height()
);
let mut asked = HashSet::new();
alice.received(&peers[0], NetworkMessage::Inv(vec![Inventory::Block(hash)]));
alice
.messages(&peers[0])
.find(|m| {
matches!(
m,
NetworkMessage::GetHeaders(GetHeadersMessage { stop_hash, .. })
if stop_hash == &hash
)
})
.expect("Alice asks the first peer for headers");
asked.insert(peers[0]);
while asked.len() < peers.len() {
alice.elapse(syncmgr::REQUEST_TIMEOUT);
let addr = peers
.iter()
.find(|peer| {
alice.messages(peer).any(|m| {
matches!(
m,
NetworkMessage::GetHeaders(GetHeadersMessage { stop_hash, .. })
if stop_hash == hash
)
})
})
.expect("Alice asks the next peer for headers");
assert!(
!asked.contains(addr),
"Alice shouldn't ask the same peer twice"
);
asked.insert(*addr);
}
}
#[test]
fn test_handshake_version_timeout() {
let network = Network::Mainnet;
let remote = ([131, 31, 11, 33], 11111).into();
let rng = fastrand::Rng::new();
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
logger::init(Level::Debug);
peer.init();
for link in &[Link::Outbound, Link::Inbound] {
if link.is_outbound() {
peer.protocol.peermgr.connect(&remote);
}
peer.protocol.connected(remote, &peer.addr, *link);
peer.outputs()
.find(|o| matches!(o, Io::SetTimer(_)))
.expect("a timer should be returned");
peer.elapse(peermgr::HANDSHAKE_TIMEOUT);
peer.outputs()
.find(|o| {
matches!(o, Io::Disconnect(a, DisconnectReason::PeerTimeout("handshake")) if a == &remote)
})
.expect("peer should disconnect when no `version` is received");
peer.disconnected(&remote, DisconnectReason::PeerTimeout("test").into());
}
}
#[test]
fn test_handshake_verack_timeout() {
let network = Network::Mainnet;
let rng = fastrand::Rng::new();
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote = PeerDummy::new([131, 31, 11, 33], network, 144, ServiceFlags::NETWORK);
peer.init();
for link in &[Link::Outbound, Link::Inbound] {
if link.is_outbound() {
peer.protocol.peermgr.connect(&remote.addr);
}
peer.protocol.connected(remote.addr, &peer.addr, *link);
peer.received(
&remote.addr,
NetworkMessage::Version(remote.version(peer.addr, 0)),
);
peer.outputs()
.find(|o| matches!(o, Io::SetTimer(_)))
.expect("a timer should be returned");
peer.elapse(LocalDuration::from_secs(60));
peer.outputs()
.find(|o| {
matches!(o, Io::Disconnect(a, DisconnectReason::PeerTimeout("handshake")) if a == &remote.addr)
})
.expect("peer should disconnect if no `verack` is received");
peer.disconnected(&remote.addr, DisconnectReason::PeerTimeout("verack").into());
}
}
#[test]
fn test_handshake_version_hook() {
let network = Network::Mainnet;
let rng = fastrand::Rng::new();
let mut cfg = Config::default();
cfg.hooks.on_version = Arc::new(|_, version: VersionMessage| {
if version.user_agent.contains("craig") {
return Err("craig is not satoshi");
}
Ok(())
});
let mut peer = Peer::config("alice", [48, 48, 48, 48], vec![], vec![], vec![], cfg, rng);
let craig = PeerDummy::new([131, 31, 11, 33], network, 144, ServiceFlags::NETWORK);
let satoshi = PeerDummy::new([131, 31, 11, 66], network, 144, ServiceFlags::NETWORK);
peer.protocol
.connected(craig.addr, &peer.addr, Link::Inbound);
peer.received(
&craig.addr,
NetworkMessage::Version(VersionMessage {
user_agent: "/craig:0.1.0/".to_owned(),
..craig.version(peer.addr, 0)
}),
);
peer.outputs()
.find(|o| matches!(o, Io::Disconnect(a, _) if a == &craig.addr))
.expect("peer should disconnect when the 'on_version' hook returns an error");
peer.protocol
.connected(satoshi.addr, &peer.addr, Link::Inbound);
peer.received(
&satoshi.addr,
NetworkMessage::Version(VersionMessage {
user_agent: "satoshi".to_owned(),
..satoshi.version(peer.addr, 0)
}),
);
peer.messages(&satoshi.addr)
.find(|m| matches!(m, NetworkMessage::Verack))
.expect("peer should send a 'verack' message back");
}
#[test]
fn test_handshake_initial_messages() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote = PeerDummy::new([131, 31, 11, 33], network, 144, ServiceFlags::NETWORK);
let local = ([0, 0, 0, 0], 0).into();
peer.init();
peer.protocol.addrmgr.insert(
std::iter::once((
peer.local_time().block_time(),
Address::new(&remote.addr, ServiceFlags::NETWORK),
)),
Source::Dns,
);
peer.protocol.peermgr.connect(&remote.addr);
peer.connected(remote.addr, &local, Link::Outbound);
peer.received(
&remote.addr,
NetworkMessage::Version(remote.version(local, 0)),
);
peer.received(&remote.addr, NetworkMessage::Verack);
let msgs = peer.messages(&remote.addr).collect::<Vec<_>>();
assert!(msgs.contains(&NetworkMessage::SendHeaders));
assert!(msgs.contains(&NetworkMessage::GetAddr));
assert!(msgs
.iter()
.any(|msg| matches!(msg, NetworkMessage::Ping(_))));
}
#[test]
fn test_connection_error() {
let network = Network::Mainnet;
let rng = fastrand::Rng::new();
let mut peer = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote = PeerDummy::new([131, 31, 11, 33], network, 144, ServiceFlags::NETWORK);
peer.command(Command::Connect(remote.addr));
peer.outputs()
.find(|o| matches!(o, Io::Connect(addr) if addr == &remote.addr))
.expect("Alice should try to connect to remote");
peer.attempted(&remote.addr);
peer.disconnected(
&remote.addr,
nakamoto_net::Disconnect::ConnectionError(
io::Error::from(io::ErrorKind::UnexpectedEof).into(),
),
);
}
#[test]
fn test_getaddr() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let bob: PeerId = ([241, 19, 44, 18], 8333).into();
let eve: PeerId = ([241, 19, 44, 19], 8333).into();
alice.connect_addr(&bob, Link::Outbound);
alice.connect_addr(&eve, Link::Outbound);
alice.disconnected(&bob, DisconnectReason::Command.into());
alice.elapse(addrmgr::REQUEST_TIMEOUT);
alice
.events()
.find(|e| matches!(e, Event::Address(addrmgr::Event::AddressBookExhausted)))
.expect("Alice should emit `AddressBookExhausted`");
alice.tock();
alice
.messages(&eve)
.find(|msg| matches!(msg, NetworkMessage::GetAddr))
.expect("Alice should send `getaddr`");
let toto: net::SocketAddr = ([14, 45, 16, 57], 8333).into();
alice.received(
&eve,
NetworkMessage::Addr(vec![(
alice.local_time().block_time(),
Address::new(&toto, ServiceFlags::NETWORK),
)]),
);
alice.elapse(peermgr::IDLE_TIMEOUT);
alice
.outputs()
.find(|o| matches!(o, Io::Connect(addr) if addr == &toto))
.expect("Alice tries to connect to Toto");
}
#[test]
fn test_stale_tip() {
let rng = fastrand::Rng::new();
let network = Network::Mainnet;
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let remote: PeerId = ([33, 33, 33, 33], network.port()).into();
let headers = &BITCOIN_HEADERS;
alice.connect_addr(&remote, Link::Outbound);
alice.received(
&remote,
NetworkMessage::Headers(vec![*headers
.get(alice.protocol.tree.height() as usize + 1)
.unwrap()]),
);
alice
.messages(&remote)
.find(|msg| matches!(msg, NetworkMessage::GetHeaders(_)))
.expect("Alice sends a `getheaders` message");
alice.elapse(syncmgr::REQUEST_TIMEOUT);
alice.elapse(syncmgr::TIP_STALE_DURATION);
alice
.events()
.find(|e| matches!(e, Event::Chain(syncmgr::Event::StaleTip(_))))
.expect("Alice emits a `StaleTip` event");
alice.elapse(syncmgr::REQUEST_TIMEOUT);
alice.received(
&remote,
NetworkMessage::Headers(vec![*headers
.get(alice.protocol.tree.height() as usize + 1)
.unwrap()]),
);
alice.elapse(syncmgr::TIP_STALE_DURATION);
alice
.events()
.find(|e| matches!(e, Event::Chain(syncmgr::Event::StaleTip(_))))
.expect("Alice emits a `StaleTip` event");
}
#[quickcheck]
fn prop_addrs(seed: u64) {
let rng = fastrand::Rng::with_seed(seed);
let network = Network::Mainnet;
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng);
let bob: PeerId = ([241, 19, 44, 18], 8333).into();
alice.connect_addr(&bob, Link::Outbound);
let jak: PeerId = ([88, 13, 16, 59], 8333).into();
let jim: PeerId = ([99, 45, 180, 58], 8333).into();
let jon: PeerId = ([14, 48, 141, 57], 8333).into();
let time = alice.local_time().block_time();
alice.received(
&bob,
NetworkMessage::Addr(vec![
(time, Address::new(&jak, ServiceFlags::NETWORK)),
(time, Address::new(&jim, ServiceFlags::NETWORK)),
(time, Address::new(&jon, ServiceFlags::NETWORK)),
]),
);
alice.received(&bob, NetworkMessage::GetAddr);
let msg = alice
.messages(&bob)
.find(|o| matches!(o, NetworkMessage::Addr(_)))
.expect("peer should respond with `addr`");
let addrs = match msg {
NetworkMessage::Addr(addrs) => addrs,
_ => unreachable!(),
};
assert_eq!(addrs.len(), 3);
let addrs: HashSet<net::SocketAddr> = addrs
.iter()
.map(|(_, a)| a.socket_addr().unwrap())
.collect();
assert!(addrs.contains(&jak));
assert!(addrs.contains(&jim));
assert!(addrs.contains(&jon));
assert_eq!(addrs.len(), 3);
}
#[quickcheck]
fn prop_connect_timeout(seed: u64) {
let rng = fastrand::Rng::with_seed(seed);
let network = Network::Mainnet;
let config = Config {
limits: Limits {
max_outbound_peers: 3,
..Limits::default()
},
connect: vec![
([77, 77, 77, 77], network.port()).into(),
([88, 88, 88, 88], network.port()).into(),
([99, 99, 88, 99], network.port()).into(),
],
network,
..Config::default()
};
let mut alice = Peer::config(
"alice",
[48, 48, 48, 48],
vec![],
vec![],
vec![],
config,
rng.clone(),
);
alice.init();
let result = alice
.outputs()
.filter(|o| matches!(o, Io::Connect(_)))
.collect::<Vec<_>>();
assert_eq!(
result.len(),
alice.protocol.peermgr.config.target_outbound_peers
);
let mut attempted: Vec<net::SocketAddr> = result
.into_iter()
.map(|r| match r {
Io::Connect(addr) => addr,
_ => panic!(),
})
.collect();
rng.shuffle(&mut attempted);
attempted.pop().unwrap();
alice.elapse(peermgr::IDLE_TIMEOUT);
assert!(alice.outputs().all(|o| match o {
Io::Connect(addr) => !attempted.contains(&addr),
_ => true,
}));
}
#[test]
#[ignore]
fn test_connect_to_peers() {
quickcheck::QuickCheck::new()
.tests(100)
.max_tests(1000)
.min_tests_passed(90) .quickcheck(
simulations::connect_to_peers as fn(Options, u64, arbitrary::InRange<1, 6>) -> bool,
);
}
#[test]
fn test_connect_to_peers_1() {
assert!(simulations::connect_to_peers(
Options {
latency: 0..3,
failure_rate: 0.1294790448987514,
},
5190880195044658821,
arbitrary::InRange(8)
));
}
#[test]
fn test_connect_to_peers_2() {
assert!(simulations::connect_to_peers(
Options {
latency: 0..3,
failure_rate: 0.1391942598336996,
},
4237581564267684273,
arbitrary::InRange(8)
));
}
#[test]
fn test_connect_to_peers_3() {
assert!(simulations::connect_to_peers(
Options {
latency: 0..3,
failure_rate: 0.1070592131461427
},
18131621610609499524,
arbitrary::InRange(4)
));
}
#[test]
fn test_connect_to_peers_4() {
assert!(simulations::connect_to_peers(
Options {
latency: 1..3,
failure_rate: 0.18729837247381553
},
714649005678913971,
arbitrary::InRange(8)
));
}
#[test]
fn test_connect_to_peers_5() {
assert!(simulations::connect_to_peers(
Options {
latency: 1..3,
failure_rate: 0.2147059622448722
},
8568929271887842621,
arbitrary::InRange(4)
));
}
#[test]
fn test_connect_to_peers_6() {
assert!(simulations::connect_to_peers(
Options {
latency: 1..3,
failure_rate: 0.08821669803054283
},
7414060157716016948,
arbitrary::InRange(1)
));
}
#[test]
fn test_submit_transactions() {
let network = Network::Mainnet;
let time = LocalTime::now();
let mut rng = fastrand::Rng::new();
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng.clone());
let remote1 = PeerDummy {
addr: ([88, 88, 88, 88], 8333).into(),
height: 144,
protocol_version: PROTOCOL_VERSION,
services: ServiceFlags::NETWORK,
relay: true,
time,
};
let remote2 = PeerDummy {
addr: ([99, 99, 99, 99], 8333).into(),
relay: false,
..remote1
};
alice.connect(&remote1, Link::Outbound);
assert!(
alice
.protocol
.peermgr
.negotiated(Link::Outbound)
.map(|(p, _)| p)
.next()
.unwrap()
.relay
);
let (transmit, receive) = chan::bounded(1);
let tx = gen::transaction(&mut rng);
let wtxid = tx.txid();
let inventory = vec![Inventory::Transaction(wtxid)];
alice.connect(&remote2, Link::Outbound);
alice.command(Command::SubmitTransaction(tx.clone(), transmit));
let remotes = receive.recv().unwrap().unwrap();
assert_eq!(Vec::from(remotes), vec![remote1.addr]);
assert!(alice.protocol.invmgr.contains(&tx.wtxid()));
alice.tock();
alice
.messages(&remote1.addr)
.find(|msg| msg == &NetworkMessage::Inv(inventory.clone()))
.expect("Alice sends an `inv` message");
alice.elapse(LocalDuration::from_secs(30));
alice.received(&remote1.addr, NetworkMessage::GetData(inventory));
alice
.messages(&remote1.addr)
.find(|msg| matches!(msg, NetworkMessage::Tx(_)))
.expect("Alice responds to `getdata` with a `tx` message");
}
#[test]
fn test_inv_rebroadcast() {
let network = Network::Mainnet;
let mut rng = fastrand::Rng::new();
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng.clone());
let remote1 = ([88, 88, 88, 88], 8333).into();
let remote2 = ([99, 99, 99, 99], 8333).into();
let tx1 = gen::transaction(&mut rng);
let tx2 = gen::transaction(&mut rng);
let (transmit, _) = chan::unbounded();
alice.connect_addr(&remote1, Link::Outbound);
alice.command(Command::SubmitTransaction(tx1, transmit.clone()));
alice.command(Command::SubmitTransaction(tx2, transmit));
alice.tock(); alice
.messages(&remote1)
.find(|m| {
matches! {
m, NetworkMessage::Inv(inv)
if inv.len() == 2
}
})
.expect("Alice sends an initial `inv`");
alice.outputs().count(); alice.connect_addr(&remote2, Link::Outbound); alice.tock(); alice
.messages(&remote2)
.find(|m| matches!(m, NetworkMessage::Inv(_)))
.expect("Alice sends a first `inv` message to the new peer");
alice.drain();
alice.elapse(LocalDuration::from_mins(5));
alice
.messages(&remote1)
.find(|m| matches!(m, NetworkMessage::Inv(_)))
.expect("Alice sends a second `inv` message to the first peer");
alice
.messages(&remote2)
.find(|m| matches!(m, NetworkMessage::Inv(_)))
.expect("Alice sends a second `inv` message to the second peer");
alice.elapse(LocalDuration::from_mins(5));
alice
.messages(&remote1)
.find(|m| {
matches! {
m, NetworkMessage::Inv(_)
}
})
.expect("Alice sends a third `inv` message to the first peer");
}
#[test]
fn test_inv_partial_broadcast() {
let network = Network::Mainnet;
let mut rng = fastrand::Rng::new();
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng.clone());
let remote1 = ([88, 88, 88, 88], 8333).into();
let remote2 = ([99, 99, 99, 99], 8333).into();
let tx1 = gen::transaction(&mut rng);
let tx2 = gen::transaction(&mut rng);
let (transmit, _) = chan::unbounded();
alice.connect_addr(&remote1, Link::Outbound);
alice.connect_addr(&remote2, Link::Outbound);
alice.command(Command::SubmitTransaction(tx1.clone(), transmit.clone()));
alice.command(Command::SubmitTransaction(tx2.clone(), transmit));
alice.tock();
alice.elapse(LocalDuration::from_secs(3));
alice.received(
&remote1,
NetworkMessage::GetData(vec![Inventory::Transaction(tx1.txid())]),
);
alice.received(
&remote2,
NetworkMessage::GetData(vec![Inventory::Transaction(tx2.txid())]),
);
alice
.messages(&remote1)
.find(|msg| {
if let NetworkMessage::Tx(tx) = msg {
return tx.txid() == tx1.txid();
}
false
})
.expect("Alice responds with only the requested inventory to peer#1");
alice
.messages(&remote2)
.find(|msg| {
if let NetworkMessage::Tx(tx) = msg {
return tx.txid() == tx2.txid();
}
false
})
.expect("Alice responds with only the requested inventory to peer#2");
alice.drain();
alice.elapse(LocalDuration::from_mins(5));
alice
.messages(&remote1)
.find(|m| {
matches! {
m, NetworkMessage::Inv(inv)
if inv.first() == Some(&Inventory::Transaction(tx2.txid()))
}
})
.expect("Alice re-sends the missing inv to peer#1");
alice
.messages(&remote2)
.find(|m| {
matches! {
m, NetworkMessage::Inv(inv)
if inv.first() == Some(&Inventory::Transaction(tx1.txid()))
}
})
.expect("Alice re-sends the missing inv to peer#2");
alice.received(
&remote1,
NetworkMessage::GetData(vec![Inventory::Transaction(tx2.txid())]),
);
alice.received(
&remote2,
NetworkMessage::GetData(vec![Inventory::Transaction(tx1.txid())]),
);
alice.drain();
alice.elapse(LocalDuration::from_mins(5));
assert_eq!(
alice
.messages(&remote1)
.filter(|m| matches!(m, NetworkMessage::Inv(_)))
.count(),
0,
"Alice has nothing more to send"
);
assert_eq!(
alice
.messages(&remote2)
.filter(|m| matches!(m, NetworkMessage::Inv(_)))
.count(),
0,
"Alice has nothing more to send"
);
}
#[test]
fn test_confirmed_transaction() {
let mut rng = fastrand::Rng::new();
let network = Network::Regtest;
let remote: PeerId = ([88, 88, 88, 88], 8333).into();
let (transmit, _) = chan::unbounded();
let genesis = network.genesis_block();
let chain = gen::blockchain(genesis, 16, &mut rng);
let headers = NonEmpty::from_vec(chain.iter().map(|b| b.header).collect()).unwrap();
let mut alice = Peer::new(
"alice",
[48, 48, 48, 48],
network,
headers.tail,
vec![],
vec![],
rng.clone(),
);
let blk1 = &chain[rng.usize(1..chain.len() / 2)];
let blk2 = &chain[rng.usize(chain.len() / 2..chain.len())];
let tx1 = &blk1.txdata[rng.usize(0..blk1.txdata.len())];
let tx2 = &blk2.txdata[rng.usize(0..blk2.txdata.len())];
alice.connect_addr(&remote, Link::Outbound);
alice.command(Command::SubmitTransaction(tx1.clone(), transmit.clone()));
alice.command(Command::SubmitTransaction(tx2.clone(), transmit));
alice.tock();
assert!(alice.protocol.invmgr.contains(&tx1.wtxid()));
assert!(alice.protocol.invmgr.contains(&tx2.wtxid()));
alice.protocol.invmgr.get_block(blk1.block_hash());
alice.protocol.invmgr.get_block(blk2.block_hash());
alice.tock();
alice.received(&remote, NetworkMessage::Block(blk2.clone()));
alice
.events()
.find(|e| matches!(e, Event::Inventory(invmgr::Event::BlockReceived { .. })))
.expect("Alice receives the 2nd block");
alice.elapse(LocalDuration::from_mins(1));
alice.received(&remote, NetworkMessage::Block(blk1.clone()));
let mut events = alice.events().filter_map(|e| {
if let Event::Inventory(event) = e {
Some(event)
} else {
None
}
});
assert!(
matches! {
events.next().unwrap(),
invmgr::Event::BlockReceived { .. }
},
"Alice receives the 1st block"
);
assert!(
matches! {
events.next().unwrap(), invmgr::Event::Confirmed { block, transaction, .. }
if block == blk1.block_hash() && transaction.txid() == tx1.txid()
},
"Alice emits the first 'Confirmed' event"
);
assert!(
matches! {
events.next().unwrap(), invmgr::Event::BlockProcessed { block, .. }
if block.block_hash() == blk1.block_hash()
},
"Alice is done processing the first block"
);
assert!(
matches! {
events.next().unwrap(), invmgr::Event::Confirmed { block, transaction, .. }
if block == blk2.block_hash() && transaction.txid() == tx2.txid()
},
"Alice emits the second 'Confirmed' event"
);
events
.find(|e| {
matches!(
e, invmgr::Event::BlockProcessed { block, .. }
if block.block_hash() == blk2.block_hash()
)
})
.expect("Alice is done processing the second block");
assert_eq!(events.count(), 0);
assert!(alice.protocol.invmgr.is_empty());
}
#[test]
fn test_submitted_transaction_filtering() {
let height = 16;
let mut rng = fastrand::Rng::new();
logger::init(log::Level::Debug);
let network = Network::Regtest;
let remote: PeerId = ([88, 88, 88, 88], 8333).into();
let (transmit, _) = chan::unbounded();
let genesis = network.genesis_block();
let chain = gen::blockchain(genesis, height, &mut rng);
let headers = NonEmpty::from_vec(chain.iter().map(|b| b.header).collect()).unwrap();
let cfheader_genesis = FilterHeader::genesis(network);
let cfheaders = gen::cfheaders_from_blocks(cfheader_genesis, chain.iter())
.into_iter()
.skip(1) .collect::<Vec<_>>();
let filter_type = 0x0;
let mut alice = Peer::new(
"alice",
[48, 48, 48, 48],
network,
headers.tail,
cfheaders.clone(),
vec![],
rng.clone(),
);
let tx = gen::transaction(&mut rng);
alice.tick(LocalTime::from_block_time(chain.last().header.time));
alice.connect(
&PeerDummy {
addr: remote,
height,
protocol_version: PROTOCOL_VERSION,
services: cbfmgr::REQUIRED_SERVICES | syncmgr::REQUIRED_SERVICES,
relay: true,
time: alice.local_time(),
},
Link::Outbound,
);
alice.command(Command::Rescan {
from: Bound::Unbounded, to: Bound::Unbounded, watch: vec![], });
alice.command(Command::SubmitTransaction(tx.clone(), transmit));
alice.tock();
assert!(alice.protocol.invmgr.contains(&tx.wtxid()));
let matching = gen::block_with(&chain.last().header, vec![tx.clone()], &mut rng);
let cfilter = gen::cfilter(&matching);
let (_, parent) = cfheaders.last().unwrap();
let (cfhash, _) = gen::cfheader(parent, &cfilter);
alice.received(&remote, NetworkMessage::Headers(vec![matching.header]));
alice
.messages(&remote)
.find(|m| matches!(m, NetworkMessage::GetCFHeaders(_)))
.expect("Alice asks for the matching cfheaders");
alice.received(
&remote,
NetworkMessage::CFHeaders(CFHeaders {
filter_type,
stop_hash: matching.block_hash(),
previous_filter_header: *parent,
filter_hashes: vec![cfhash],
}),
);
alice
.messages(&remote)
.find(|m| matches!(m, NetworkMessage::GetCFilters(_)))
.expect("Alice asks for the cfilter");
alice.received(
&remote,
NetworkMessage::CFilter(CFilter {
filter_type,
block_hash: matching.block_hash(),
filter: cfilter.content,
}),
);
let expected = vec![Inventory::Block(matching.block_hash())];
alice.tock();
alice
.messages(&remote)
.find(|m| matches!(m, NetworkMessage::GetData(data) if data == &expected))
.expect("Alice asks for the matching block");
alice.received(&remote, NetworkMessage::Block(matching));
assert!(alice.protocol.invmgr.is_empty(), "The mempool is empty");
assert!(
!alice.protocol.cbfmgr.unwatch_transaction(&tx.txid()),
"The transaction is no longer watched"
);
}
#[test]
fn test_transaction_reverted_reconfirm() {
let height = 16;
let mut rng = fastrand::Rng::new();
logger::init(log::Level::Debug);
let network = Network::Regtest;
let remote: PeerId = ([88, 88, 88, 88], 8333).into();
let genesis = network.genesis_block();
let chain = gen::blockchain(genesis, height, &mut rng);
let chain_tip = chain.last();
let headers = NonEmpty::from_vec(chain.iter().map(|b| b.header).collect()).unwrap();
let cfheader_genesis = FilterHeader::genesis(network);
let cfheaders = gen::cfheaders_from_blocks(cfheader_genesis, chain.iter())
.into_iter()
.skip(1) .collect::<Vec<_>>();
let (_, cfheaders_tip) = cfheaders.last().unwrap();
let filter_type = 0x0;
let mut alice = Peer::new(
"alice",
[48, 48, 48, 48],
network,
headers.tail,
cfheaders.clone(),
vec![],
rng.clone(),
);
let tx = gen::transaction(&mut rng);
alice.connect(
&PeerDummy {
addr: remote,
height,
protocol_version: PROTOCOL_VERSION,
services: cbfmgr::REQUIRED_SERVICES | syncmgr::REQUIRED_SERVICES,
relay: true,
time: alice.local_time(),
},
Link::Outbound,
);
let (submit_reply, _) = chan::bounded(1);
alice.command(Command::Rescan {
from: Bound::Unbounded, to: Bound::Unbounded, watch: vec![], });
alice.command(Command::SubmitTransaction(tx.clone(), submit_reply));
alice.tock();
{
let matching = gen::block_with(&chain_tip.header, vec![tx.clone()], &mut rng);
let cfilter = gen::cfilter(&matching);
let (_, parent) = cfheaders.last().unwrap();
let (cfhash, _) = gen::cfheader(parent, &cfilter);
alice.tick(LocalTime::from_block_time(chain_tip.header.time));
alice.received(&remote, NetworkMessage::Headers(vec![matching.header]));
alice.received(
&remote,
NetworkMessage::CFHeaders(CFHeaders {
filter_type,
stop_hash: matching.block_hash(),
previous_filter_header: *parent,
filter_hashes: vec![cfhash],
}),
);
alice.received(
&remote,
NetworkMessage::CFilter(CFilter {
filter_type,
block_hash: matching.block_hash(),
filter: cfilter.content,
}),
);
alice.received(&remote, NetworkMessage::Block(matching));
alice
.events()
.find(|e| {
matches!(
e,
Event::Inventory(invmgr::Event::Confirmed { transaction, .. })
if transaction.txid() == tx.txid()
)
})
.expect("The transaction is confirmed");
}
{
let fork_trunk = gen::fork(&chain_tip.header, 3, &mut rng);
let fork_trunk_tip = fork_trunk.last().unwrap().header;
let fork_matching = gen::block_with(&fork_trunk_tip, vec![tx.clone()], &mut rng);
let fork = fork_trunk
.iter()
.chain(iter::once(&fork_matching))
.collect::<Vec<_>>();
let fork_tip = fork.last().unwrap();
let fork_cfheaders = gen::cfheaders_from_blocks(*cfheaders_tip, fork.iter().cloned())
.into_iter()
.collect::<Vec<_>>();
let fork_cfilters = gen::cfilters(fork.iter().cloned());
alice.received(
&remote,
NetworkMessage::Headers(fork.iter().map(|b| b.header).collect()),
);
alice
.events()
.find(|e| {
matches!(
e,
Event::Inventory(invmgr::Event::Reverted { transaction })
if transaction.txid() == tx.txid()
)
})
.expect("The transaction is reverted");
assert!(alice.protocol.invmgr.contains(&tx.wtxid()));
alice.tock();
alice
.messages(&remote)
.find(|m| {
matches!(
m,
NetworkMessage::GetCFHeaders(GetCFHeaders { stop_hash, .. })
if stop_hash == &fork_tip.block_hash()
)
})
.expect("Alice asks for the cfheaders");
alice.received(
&remote,
NetworkMessage::CFHeaders(CFHeaders {
filter_type,
stop_hash: fork_tip.block_hash(),
previous_filter_header: *cfheaders_tip,
filter_hashes: fork_cfheaders.iter().map(|(hash, _)| *hash).collect(),
}),
);
alice
.messages(&remote)
.find(|m| {
matches!(
m,
NetworkMessage::GetCFilters(GetCFilters { stop_hash, .. })
if stop_hash == &fork_tip.block_hash()
)
})
.expect("Alice asks for the cfilters");
for (cfilter, block) in fork_cfilters.into_iter().zip(fork.iter()) {
alice.received(
&remote,
NetworkMessage::CFilter(CFilter {
filter_type,
block_hash: block.block_hash(),
filter: cfilter.content,
}),
);
}
alice
.events()
.find(|e| {
matches!(
e,
Event::Filter(cbfmgr::Event::FilterProcessed { block, matched: true, .. })
if block == &fork_matching.block_hash()
)
})
.expect("The new block is matched");
alice.received(&remote, NetworkMessage::Block(fork_matching.clone()));
alice
.events()
.find(|e| {
matches!(
e,
Event::Inventory(invmgr::Event::Confirmed { transaction, block, .. })
if transaction.txid() == tx.txid() && block == &fork_matching.block_hash()
)
})
.expect("The transaction is re-confirmed");
}
}
#[test]
fn test_block_events() {
let mut rng = fastrand::Rng::new();
let network = Network::Regtest;
let genesis = network.genesis();
let remote: PeerId = ([88, 88, 88, 88], 8333).into();
let mut alice = Peer::genesis("alice", [48, 48, 48, 48], network, vec![], rng.clone());
let (transmit, import) = chan::unbounded();
let best = 16;
let headers = gen::headers(genesis, best, &mut rng);
let extra = gen::block(headers.last(), &mut rng);
let fork_height = 8;
let fork_best = 20;
let fork = gen::headers(
headers[fork_height as usize],
fork_best - fork_height,
&mut rng,
);
logger::init(log::Level::Debug);
fn filter(events: impl Iterator<Item = Event>) -> impl Iterator<Item = syncmgr::Event> {
events.filter_map(|e| match e {
Event::Chain(event @ syncmgr::Event::BlockConnected { .. }) => Some(event),
Event::Chain(event @ syncmgr::Event::BlockDisconnected { .. }) => Some(event),
Event::Chain(event @ syncmgr::Event::Synced { .. }) => Some(event),
_ => None,
})
}
alice.tick(LocalTime::from_block_time(headers.last().time));
alice.init();
alice.command(Command::ImportHeaders(
headers.tail.clone(),
transmit.clone(),
));
import.recv().unwrap().unwrap();
let mut events = filter(alice.events());
assert_matches!(
events.next().unwrap(),
syncmgr::Event::Synced(hash, height)
if height == 0 && hash == genesis.block_hash()
);
for (height_, header) in headers.iter().enumerate().skip(1) {
let hash_ = header.block_hash();
assert_matches!(
events.next().unwrap(),
syncmgr::Event::BlockConnected { height, header }
if height == height_ as Height && header.block_hash() == hash_
);
}
assert_matches!(events.next().unwrap(), syncmgr::Event::Synced(_, height) if height == best);
assert_eq!(events.count(), 0);
alice.connect_addr(&remote, Link::Outbound);
alice.received(
&remote,
NetworkMessage::Inv(vec![Inventory::Block(extra.block_hash())]),
);
alice.received(&remote, NetworkMessage::Headers(vec![extra.header]));
let mut events = filter(alice.events());
assert_matches!(
events.next().unwrap(),
syncmgr::Event::BlockConnected { height, header }
if height == best + 1 && header.block_hash() == extra.block_hash()
);
assert_matches!(
events.next().unwrap(),
syncmgr::Event::Synced(_, height) if height == best + 1
);
assert_eq!(0, events.count());
alice.tick(LocalTime::from_block_time(extra.header.time));
alice.command(Command::ImportHeaders(fork.tail.clone(), transmit));
import.recv().unwrap().unwrap();
let mut events = filter(alice.events());
assert_matches!(
events.next().unwrap(),
syncmgr::Event::BlockDisconnected { height, header }
if height == best + 1 && header.block_hash() == extra.block_hash()
);
for height_ in (fork_height + 1..=best).rev() {
let hash_ = headers[height_ as usize].block_hash();
assert_matches!(
events.next().unwrap(),
syncmgr::Event::BlockDisconnected { height, header }
if height == height_ as Height && header.block_hash() == hash_
);
}
for height_ in fork_height + 1..=fork_best {
let hash_ = fork[height_ as usize - fork_height as usize].block_hash();
assert_matches!(
events.next().unwrap(),
syncmgr::Event::BlockConnected { height, header }
if height == height_ as Height && header.block_hash() == hash_
);
}
assert_matches!(
events.next().unwrap(),
syncmgr::Event::Synced(_, height)
if height == fork_best
);
assert!(events.next().is_none());
}
#[test]
fn test_transaction_mempool_rebroadcast() {
}
#[test]
fn test_getdata_retry() {
}