#![allow(clippy::manual_range_contains)]
use std::collections::HashSet;
use std::sync::Arc;
use std::{fmt, io, net};
use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::{Transaction, Txid};
use nakamoto_common::block::{Block, BlockHash, BlockHeader, Height};
use nakamoto_net::event::Emitter;
use nakamoto_net::Disconnect;
use nakamoto_p2p::fsm;
use nakamoto_p2p::fsm::fees::FeeEstimate;
use nakamoto_p2p::fsm::{Link, PeerId};
#[derive(Clone, Debug)]
pub enum Loading {
BlockHeaderLoaded {
height: Height,
},
FilterHeaderLoaded {
height: Height,
},
FilterHeaderVerified {
height: Height,
},
}
impl fmt::Display for Loading {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::BlockHeaderLoaded { height } => {
write!(fmt, "block header #{} loaded", height)
}
Self::FilterHeaderLoaded { height } => {
write!(fmt, "filter header #{} loaded", height)
}
Self::FilterHeaderVerified { height } => {
write!(fmt, "filter header #{} verified", height)
}
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
Ready {
tip: Height,
filter_tip: Height,
},
PeerConnected {
addr: PeerId,
link: Link,
},
PeerDisconnected {
addr: PeerId,
reason: Disconnect<fsm::DisconnectReason>,
},
PeerConnectionFailed {
addr: PeerId,
error: Arc<io::Error>,
},
PeerNegotiated {
addr: PeerId,
link: Link,
services: ServiceFlags,
height: Height,
user_agent: String,
version: u32,
},
PeerHeightUpdated {
height: Height,
},
BlockConnected {
header: BlockHeader,
hash: BlockHash,
height: Height,
},
BlockDisconnected {
header: BlockHeader,
hash: BlockHash,
height: Height,
},
BlockMatched {
hash: BlockHash,
header: BlockHeader,
height: Height,
transactions: Vec<Transaction>,
},
FeeEstimated {
block: BlockHash,
height: Height,
fees: FeeEstimate,
},
FilterProcessed {
block: BlockHash,
height: Height,
matched: bool,
valid: bool,
},
TxStatusChanged {
txid: Txid,
status: TxStatus,
},
Synced {
height: Height,
tip: Height,
},
}
impl fmt::Display for Event {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ready { .. } => {
write!(fmt, "ready to process events and commands")
}
Self::BlockConnected { hash, height, .. } => {
write!(fmt, "block {} connected at height {}", hash, height)
}
Self::BlockDisconnected { hash, height, .. } => {
write!(fmt, "block {} disconnected at height {}", hash, height)
}
Self::BlockMatched { hash, height, .. } => {
write!(
fmt,
"block {} ready to be processed at height {}",
hash, height
)
}
Self::FeeEstimated { fees, height, .. } => {
write!(
fmt,
"transaction median fee rate for block #{} is {} sat/vB",
height, fees.median,
)
}
Self::FilterProcessed {
height, matched, ..
} => {
write!(
fmt,
"filter processed at height {} (match = {})",
height, matched
)
}
Self::TxStatusChanged { txid, status } => {
write!(fmt, "transaction {} status changed: {}", txid, status)
}
Self::Synced { height, .. } => write!(fmt, "filters synced up to height {}", height),
Self::PeerConnected { addr, link } => {
write!(fmt, "peer {} connected ({:?})", &addr, link)
}
Self::PeerConnectionFailed { addr, error } => {
write!(
fmt,
"peer connection attempt to {} failed with {}",
&addr, error
)
}
Self::PeerHeightUpdated { height } => {
write!(fmt, "peer height updated to {}", height)
}
Self::PeerDisconnected { addr, reason } => {
write!(fmt, "disconnected from {} ({})", &addr, reason)
}
Self::PeerNegotiated {
addr,
height,
services,
..
} => write!(
fmt,
"peer {} negotiated with services {} and height {}..",
addr, services, height
),
}
}
}
#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
pub enum TxStatus {
Unconfirmed,
Acknowledged {
peer: net::SocketAddr,
},
Confirmed {
height: Height,
block: BlockHash,
},
Reverted,
Stale {
replaced_by: Txid,
block: BlockHash,
},
}
impl fmt::Display for TxStatus {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Unconfirmed => write!(fmt, "transaction is unconfirmed"),
Self::Acknowledged { peer } => {
write!(fmt, "transaction was acknowledged by peer {}", peer)
}
Self::Confirmed { height, block } => write!(
fmt,
"transaction was included in block {} at height {}",
block, height
),
Self::Reverted => write!(fmt, "transaction has been reverted"),
Self::Stale { replaced_by, block } => write!(
fmt,
"transaction was replaced by {} in block {}",
replaced_by, block
),
}
}
}
pub(crate) struct Mapper {
tip: Height,
sync_height: Height,
filter_height: Height,
block_height: Height,
pending: HashSet<Height>,
}
impl Default for Mapper {
fn default() -> Self {
let tip = 0;
let sync_height = 0;
let filter_height = 0;
let block_height = 0;
let pending = HashSet::new();
Self {
tip,
sync_height,
filter_height,
block_height,
pending,
}
}
}
impl Mapper {
pub fn process(&mut self, event: fsm::Event, emitter: &Emitter<Event>) {
match event {
fsm::Event::Ready {
height,
filter_height,
..
} => {
emitter.emit(Event::Ready {
tip: height,
filter_tip: filter_height,
});
}
fsm::Event::Peer(fsm::PeerEvent::Connected(addr, link)) => {
emitter.emit(Event::PeerConnected { addr, link });
}
fsm::Event::Peer(fsm::PeerEvent::ConnectionFailed(addr, error)) => {
emitter.emit(Event::PeerConnectionFailed { addr, error });
}
fsm::Event::Peer(fsm::PeerEvent::Negotiated {
addr,
link,
services,
user_agent,
height,
version,
}) => {
emitter.emit(Event::PeerNegotiated {
addr,
link,
services,
user_agent,
height,
version,
});
}
fsm::Event::Peer(fsm::PeerEvent::Disconnected(addr, reason)) => {
emitter.emit(Event::PeerDisconnected { addr, reason });
}
fsm::Event::Chain(fsm::ChainEvent::PeerHeightUpdated { height }) => {
emitter.emit(Event::PeerHeightUpdated { height });
}
fsm::Event::Chain(fsm::ChainEvent::Synced(_, height)) => {
self.tip = height;
}
fsm::Event::Chain(fsm::ChainEvent::BlockConnected { header, height }) => {
emitter.emit(Event::BlockConnected {
header,
hash: header.block_hash(),
height,
});
}
fsm::Event::Chain(fsm::ChainEvent::BlockDisconnected { header, height }) => {
emitter.emit(Event::BlockDisconnected {
header,
hash: header.block_hash(),
height,
});
}
fsm::Event::Inventory(fsm::InventoryEvent::BlockProcessed {
block,
height,
fees,
}) => {
let hash = self.process_block(block, height, emitter);
if let Some(fees) = fees {
emitter.emit(Event::FeeEstimated {
block: hash,
height,
fees,
});
}
}
fsm::Event::Inventory(fsm::InventoryEvent::Confirmed {
transaction,
height,
block,
}) => {
emitter.emit(Event::TxStatusChanged {
txid: transaction.txid(),
status: TxStatus::Confirmed { height, block },
});
}
fsm::Event::Inventory(fsm::InventoryEvent::Acknowledged { txid, peer }) => {
emitter.emit(Event::TxStatusChanged {
txid,
status: TxStatus::Acknowledged { peer },
});
}
fsm::Event::Filter(fsm::FilterEvent::RescanStarted { start, .. }) => {
self.pending.clear();
self.filter_height = start;
self.sync_height = start;
self.block_height = start;
}
fsm::Event::Filter(fsm::FilterEvent::FilterProcessed {
block,
height,
matched,
valid,
..
}) => {
self.process_filter(block, height, matched, valid, emitter);
}
_ => {}
}
assert!(
self.block_height <= self.filter_height,
"Filters are processed before blocks"
);
assert!(
self.sync_height <= self.filter_height,
"Filters are processed before we are done"
);
let height = if self.pending.is_empty() {
self.filter_height
} else {
self.block_height
};
if height > self.sync_height {
self.sync_height = height;
emitter.emit(Event::Synced {
height,
tip: self.tip,
});
}
}
fn process_block(
&mut self,
block: Block,
height: Height,
emitter: &Emitter<Event>,
) -> BlockHash {
let hash = block.block_hash();
if !self.pending.remove(&height) {
return hash;
}
log::debug!("Received block {} at height {}", hash, height);
debug_assert!(height >= self.block_height);
self.block_height = height;
emitter.emit(Event::BlockMatched {
height,
hash,
header: block.header,
transactions: block.txdata,
});
hash
}
fn process_filter(
&mut self,
block: BlockHash,
height: Height,
matched: bool,
valid: bool,
emitter: &Emitter<Event>,
) {
debug_assert!(height >= self.filter_height);
if matched {
log::debug!("Filter matched for block #{}", height);
self.pending.insert(height);
}
self.filter_height = height;
emitter.emit(Event::FilterProcessed {
height,
matched,
valid,
block,
});
}
}
#[cfg(test)]
mod test {
use std::io;
use nakamoto_common::bitcoin_hashes::Hash;
use quickcheck::TestResult;
use quickcheck_macros::quickcheck;
use nakamoto_common::block::time::Clock as _;
use nakamoto_common::network::Network;
use nakamoto_net::{Disconnect, Link, LocalTime, StateMachine as _};
use nakamoto_test::assert_matches;
use nakamoto_test::block::gen;
use super::Event;
use super::*;
use crate::handle::Handle as _;
use crate::tests::mock;
use crate::Command;
#[test]
fn test_ready_event() {
let network = Network::Regtest;
let mut client = mock::Client::new(network);
let handle = client.handle();
let events = handle.events();
let time = LocalTime::now();
client.protocol.initialize(time);
client.step();
assert_matches!(events.try_recv(), Ok(Event::Ready { .. }));
}
#[test]
fn test_peer_connected_disconnected() {
let network = Network::Regtest;
let mut client = mock::Client::new(network);
let handle = client.handle();
let remote = ([44, 44, 44, 44], 8333).into();
let local_addr = ([0, 0, 0, 0], 16333).into();
let events = handle.events();
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.step();
assert_matches!(
events.try_recv(),
Ok(Event::PeerConnected { addr, link, .. })
if addr == remote && link == Link::Inbound
);
client.protocol.disconnected(
&remote,
Disconnect::ConnectionError(io::Error::from(io::ErrorKind::UnexpectedEof).into()),
);
client.step();
assert_matches!(
events.try_recv(),
Ok(Event::PeerDisconnected { addr, reason: Disconnect::ConnectionError(_) })
if addr == remote
);
}
#[test]
fn test_peer_connection_failed() {
let network = Network::Regtest;
let mut client = mock::Client::new(network);
let handle = client.handle();
let remote = ([44, 44, 44, 44], 8333).into();
let events = handle.events();
client.protocol.command(Command::Connect(remote));
client.protocol.attempted(&remote);
client.step();
assert_matches!(events.try_recv(), Err(_));
client.protocol.disconnected(
&remote,
Disconnect::ConnectionError(io::Error::from(io::ErrorKind::UnexpectedEof).into()),
);
client.step();
assert_matches!(
events.try_recv(),
Ok(Event::PeerConnectionFailed { addr, error })
if addr == remote && error.kind() == io::ErrorKind::UnexpectedEof
);
}
#[test]
fn test_peer_height_updated() {
use nakamoto_common::bitcoin::network::address::Address;
use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::network::message::NetworkMessage;
use nakamoto_common::bitcoin::network::message_network::VersionMessage;
let network = Network::default();
let mut client = mock::Client::new(network);
let handle = client.handle();
let remote = ([44, 44, 44, 44], 8333).into();
let local_time = LocalTime::now();
let local_addr = ([0, 0, 0, 0], 16333).into();
let events = handle.events();
let version = |height: Height| -> NetworkMessage {
NetworkMessage::Version(VersionMessage {
version: fsm::MIN_PROTOCOL_VERSION,
services: ServiceFlags::NETWORK,
timestamp: local_time.block_time() as i64,
receiver: Address::new(&remote, ServiceFlags::NONE),
sender: Address::new(&local_addr, ServiceFlags::NONE),
nonce: 42,
user_agent: "?".to_owned(),
start_height: height as i32,
relay: false,
})
};
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.received(&remote, version(42));
client.received(&remote, NetworkMessage::Verack);
client.step();
events
.try_iter()
.find(|e| matches!(e, Event::PeerHeightUpdated { height } if *height == 42))
.expect("We receive an event for the updated peer height");
let remote = ([45, 45, 45, 45], 8333).into();
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.received(&remote, version(43));
client.received(&remote, NetworkMessage::Verack);
client.step();
events
.try_iter()
.find(|e| matches!(e, Event::PeerHeightUpdated { height } if *height == 43))
.expect("We receive an event for the updated peer height");
}
#[test]
fn test_peer_negotiated() {
use nakamoto_common::bitcoin::network::address::Address;
use nakamoto_common::bitcoin::network::constants::ServiceFlags;
use nakamoto_common::bitcoin::network::message::NetworkMessage;
use nakamoto_common::bitcoin::network::message_network::VersionMessage;
let network = Network::default();
let mut client = mock::Client::new(network);
let handle = client.handle();
let remote = ([44, 44, 44, 44], 8333).into();
let local_time = LocalTime::now();
let local_addr = ([0, 0, 0, 0], 16333).into();
let events = handle.events();
client
.protocol
.connected(remote, &local_addr, Link::Inbound);
client.step();
let version = NetworkMessage::Version(VersionMessage {
version: fsm::MIN_PROTOCOL_VERSION,
services: ServiceFlags::NETWORK,
timestamp: local_time.block_time() as i64,
receiver: Address::new(&remote, ServiceFlags::NONE),
sender: Address::new(&local_addr, ServiceFlags::NONE),
nonce: 42,
user_agent: "?".to_owned(),
start_height: 42,
relay: false,
});
client.received(&remote, version);
client.received(&remote, NetworkMessage::Verack);
client.step();
assert_matches!(events.try_recv(), Ok(Event::PeerConnected { .. }));
assert_matches!(
events.try_recv(),
Ok(Event::PeerNegotiated { addr, height, user_agent, .. })
if addr == remote && height == 42 && user_agent == "?"
);
}
#[quickcheck]
fn prop_client_side_filtering(birth: Height, height: Height, seed: u64) -> TestResult {
if height < 1 || height > 24 || birth >= height {
return TestResult::discard();
}
let mut rng = fastrand::Rng::with_seed(seed);
let network = Network::Regtest;
let genesis = network.genesis_block();
let chain = gen::blockchain(genesis, height, &mut rng);
let mut mock = mock::Client::new(network);
let mut client = mock.handle();
client.tip = (height, chain[height as usize].header);
let mut spent = 0;
let (watch, heights, balance) = gen::watchlist_rng(birth, chain.iter(), &mut rng);
log::debug!(
"-- Test case with birth = {} and height = {}",
birth,
height
);
let subscriber = client.events();
mock.subscriber
.broadcast(fsm::Event::Chain(fsm::ChainEvent::Synced(
chain.last().block_hash(),
height,
)));
for h in birth..=height {
let matched = heights.contains(&h);
let block = chain[h as usize].clone();
mock.subscriber
.broadcast(fsm::Event::Filter(fsm::FilterEvent::FilterProcessed {
block: block.block_hash(),
height: h,
matched,
cached: false,
valid: true,
}));
if matched {
mock.subscriber.broadcast(fsm::Event::Inventory(
fsm::InventoryEvent::BlockProcessed {
block,
height: h,
fees: None,
},
));
}
}
for event in subscriber.try_iter() {
match event {
Event::BlockMatched { transactions, .. } => {
for t in &transactions {
for output in &t.output {
if watch.contains(&output.script_pubkey) {
spent += output.value;
}
}
}
}
Event::Synced {
height: sync_height,
tip,
} => {
assert_eq!(height, tip);
if sync_height == tip {
break;
}
}
_ => {}
}
}
assert_eq!(balance, spent);
client.shutdown().unwrap();
TestResult::passed()
}
#[test]
fn test_tx_status_ordering() {
assert!(
TxStatus::Unconfirmed
< TxStatus::Acknowledged {
peer: ([0, 0, 0, 0], 0).into()
}
);
assert!(
TxStatus::Acknowledged {
peer: ([0, 0, 0, 0], 0).into()
} < TxStatus::Confirmed {
height: 0,
block: BlockHash::all_zeros(),
}
);
assert!(
TxStatus::Confirmed {
height: 0,
block: BlockHash::all_zeros(),
} < TxStatus::Reverted
);
assert!(
TxStatus::Reverted
< TxStatus::Stale {
replaced_by: Txid::all_zeros(),
block: BlockHash::all_zeros()
}
);
}
}