mod blocks;
mod genesis;
mod messages;
use crate::blocks::Block;
use crate::network::Network;
use crate::node::header::{Extensions, Header, MessageType};
use crate::node::messages::frontier_resp::FrontierResp;
use crate::node::state::ArcState;
use crate::node::wire::Wire;
use crate::{to_hex, Public, Rai};
use anyhow::{anyhow, Context};
use std::fmt::Debug;
use std::net::SocketAddr;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{debug, instrument, trace};
#[derive(Debug)]
pub struct Packet {
pub annotation: Option<String>,
pub data: Vec<u8>,
}
impl Packet {
pub fn new(data: Vec<u8>) -> Self {
Self {
data,
annotation: None,
}
}
pub fn new_with_annotation(data: Vec<u8>, annotation: String) -> Self {
Self {
data,
annotation: Some(annotation),
}
}
}
pub struct Controller {
pub validate_handshakes: bool,
network: Network,
state: ArcState,
peer_addr: SocketAddr,
frontier_stream: bool,
incoming_buffer: Vec<u8>,
incoming: Receiver<Packet>,
outgoing: Sender<Packet>,
pub(crate) header: Header,
last_annotation: Option<String>,
}
impl Controller {
pub fn new_with_channels(
network: Network,
state: ArcState,
peer_addr: SocketAddr,
) -> (Self, Sender<Packet>, Receiver<Packet>) {
let (incoming_tx, incoming_rx) = mpsc::channel::<Packet>(100);
let (outgoing_tx, outgoing_rx) = mpsc::channel::<Packet>(100);
let s = Self {
validate_handshakes: true,
network,
state,
peer_addr,
frontier_stream: false,
incoming_buffer: Vec::with_capacity(10_000),
incoming: incoming_rx,
outgoing: outgoing_tx,
header: Header::new(network, MessageType::Handshake, Extensions::new()),
last_annotation: None,
};
(s, incoming_tx, outgoing_rx)
}
pub async fn run(mut self) -> anyhow::Result<()> {
macro_rules! handle {
($self: ident, $fun:ident, $header:expr) => {{
let sh = Some(&$header);
let payload = self
.recv(sh)
.await
.with_context(|| format!("Receiving payload for {:?}", $header))?;
match &self.last_annotation {
Some(a) => debug!("{} {:?}", a, &payload),
None => debug!("{:?}", &payload),
};
$self
.$fun(&$header, payload)
.await
.with_context(|| format!("Handling payload for {:?}", $header))?;
};};
}
trace!("Initial handshake");
self.send_handshake().await?;
loop {
if self.frontier_stream {
let payload = self.recv::<FrontierResp>(None).await?;
self.handle_frontier_resp(payload).await?;
} else {
let header = self.recv::<Header>(None).await?;
header.validate(&self.network)?;
match header.message_type() {
MessageType::Keepalive => handle!(self, handle_keepalive, header),
MessageType::Publish => handle!(self, handle_publish, header),
MessageType::ConfirmReq => handle!(self, handle_confirm_req, header),
MessageType::ConfirmAck => handle!(self, handle_confirm_ack, header),
MessageType::FrontierReq => handle!(self, handle_frontier_req, header),
MessageType::Handshake => handle!(self, handle_handshake, header),
MessageType::TelemetryReq => handle!(self, handle_telemetry_req, header),
MessageType::TelemetryAck => handle!(self, handle_telemetry_ack, header),
_ => panic!("{:?}", header),
};
}
}
}
#[instrument(skip(self, header))]
async fn recv<T: Wire + Debug>(&mut self, header: Option<&Header>) -> anyhow::Result<T> {
let expected_len = T::len(header)?;
let buffer = self.recv_buf(expected_len).await?;
trace!("HEX: {}", to_hex(&buffer));
let result = T::deserialize(header, &buffer)?;
Ok(result)
}
async fn recv_buf(&mut self, size: usize) -> anyhow::Result<Vec<u8>> {
loop {
if self.incoming_buffer.len() >= size {
return self.recv_immediate(size);
}
let packet = match self.incoming.recv().await {
Some(data) => data,
None => {
return Err(anyhow!(
"Incoming stream disconnected {:?} {:?}",
self.peer_addr,
self.last_annotation
))
}
};
if let Some(annotation) = packet.annotation {
self.last_annotation = Some(annotation);
}
self.incoming_buffer.extend(packet.data);
}
}
fn recv_immediate(&mut self, size: usize) -> anyhow::Result<Vec<u8>> {
debug_assert!(self.incoming_buffer.len() >= size);
let buf = self.incoming_buffer[0..size].to_owned();
self.incoming_buffer = Vec::from(&self.incoming_buffer[size..]);
Ok(buf)
}
#[instrument(level = "debug", skip(self, message))]
async fn send<T: Wire + Debug>(&mut self, message: &T) -> anyhow::Result<()> {
let data = message.serialize();
trace!("HEX {}", to_hex(&data));
debug!("OBJ {:?}", &message);
self.outgoing.send(Packet::new(Vec::from(data))).await?;
Ok(())
}
async fn send_header(
&mut self,
message_type: MessageType,
ext: Extensions,
) -> anyhow::Result<()> {
let mut header = self.header;
header.reset(message_type, ext);
Ok(self.send(&header).await?)
}
pub async fn init(&mut self) -> anyhow::Result<()> {
self.ensure_genesis().await.context("Ensuring genesis")?;
Ok(())
}
pub async fn balance_rep_weights(&mut self, _full_block: &Block) -> anyhow::Result<()> {
todo!()
}
pub async fn account_balance(&self, account: &Public) -> anyhow::Result<Rai> {
let context = || anyhow!("Account balance for {:?}", account);
let block = self.get_latest_block(account).await.with_context(context)?;
match block {
Some(block) => Ok(block.balance().to_owned()),
None => Ok(Rai::zero()),
}
}
pub fn network(&self) -> &Network {
&self.network
}
pub fn peer_addr(&self) -> &SocketAddr {
&self.peer_addr
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::blocks::{Block, BlockHash, OpenBlock, Previous, SendBlock};
use crate::node::state::MemoryState;
use crate::{Address, DEFAULT_PORT};
use std::net::{Ipv4Addr, SocketAddrV4};
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
async fn empty_lattice(network: Network) -> Controller {
let state = Arc::new(Mutex::new(MemoryState::new(network)));
let (mut controller, _rx, _tx) = Controller::new_with_channels(
network,
state,
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_PORT)),
);
controller.init().await.unwrap();
controller
}
#[tokio::test]
async fn genesis() {
let network = Network::Live;
let genesis = network.genesis_block();
let controller = empty_lattice(network).await;
assert_eq!(
controller
.get_latest_block(genesis.account())
.await
.unwrap()
.unwrap()
.balance(),
&Rai::max()
);
}
#[tokio::test]
async fn send_then_recv_to_new_account() {
let network = Network::Live;
let genesis = network.genesis_block();
let landing_account =
Address::from_str("nano_13ezf4od79h1tgj9aiu4djzcmmguendtjfuhwfukhuucboua8cpoihmh8byo")
.unwrap()
.to_public();
let mut controller = empty_lattice(network).await;
let gen_send: SendBlock = serde_json::from_str(
r#"{
"type": "send",
"previous": "991CF190094C00F0B68E2E5F75F6BEE95A2E0BD93CEAA4A6734DB9F19B728948",
"destination": "nano_13ezf4od79h1tgj9aiu4djzcmmguendtjfuhwfukhuucboua8cpoihmh8byo",
"balance": "FD89D89D89D89D89D89D89D89D89D89D",
"work": "3c82cc724905ee95",
"signature": "5B11B17DB9C8FE0CC58CAC6A6EECEF9CB122DA8A81C6D3DB1B5EE3AB065AA8F8CB1D6765C8EB91B58530C5FF5987AD95E6D34BB57F44257E20795EE412E61600"
}"#,
)
.unwrap();
let mut block: Block =
Block::from_send_block(&gen_send, genesis.account(), genesis.representative());
block.calc_hash().unwrap();
controller.add_elected_block(&block).await.unwrap();
let given = Rai::from(3271945835778254456378601994536232802u128);
let genesis_balance = Rai::max().checked_sub(&given).unwrap();
assert_eq!(
controller
.account_balance(&genesis.account())
.await
.unwrap(),
genesis_balance
);
assert_eq!(
controller.account_balance(&landing_account).await.unwrap(),
Rai::zero()
);
let land_open: OpenBlock = serde_json::from_str(
r#"{
"type": "open",
"source": "A170D51B94E00371ACE76E35AC81DC9405D5D04D4CEBC399AEACE07AE05DD293",
"representative": "nano_1awsn43we17c1oshdru4azeqjz9wii41dy8npubm4rg11so7dx3jtqgoeahy",
"account": "nano_13ezf4od79h1tgj9aiu4djzcmmguendtjfuhwfukhuucboua8cpoihmh8byo",
"work": "e997c097a452a1b1",
"signature": "E950FFDF0C9C4DAF43C27AE3993378E4D8AD6FA591C24497C53E07A3BC80468539B0A467992A916F0DDA6F267AD764A3C1A5BDBD8F489DFAE8175EEE0E337402"
}"#,
).unwrap();
let mut land_open = Block::from_open_block(&land_open, &Previous::Open, &given);
land_open.calc_hash().unwrap();
assert_eq!(
land_open.hash().unwrap(),
&BlockHash::from_str(
"90D0C16AC92DD35814E84BFBCC739A039615D0A42A76EF44ADAEF1D99E9F8A35"
)
.unwrap()
);
controller.add_elected_block(&land_open).await.unwrap();
assert_eq!(
controller.account_balance(&landing_account).await.unwrap(),
given
);
let land_send: SendBlock = serde_json::from_str(
r#"{
"type": "send",
"previous": "90D0C16AC92DD35814E84BFBCC739A039615D0A42A76EF44ADAEF1D99E9F8A35",
"destination": "nano_35jjmmmh81kydepzeuf9oec8hzkay7msr6yxagzxpcht7thwa5bus5tomgz9",
"balance": "02761762762762762762762762762762",
"work": "6d6d59ca60cab77d",
"signature": "434CF7E7B2C2CAA3E3910CC711B29498870636C1247EA8C72BD5C0A7BB15A7BACFEC9CF289B92E4BD56F56E68277B45B3A3FF9339D2547038B87DE38C851B70B"
}"#).unwrap();
let mut land_send =
Block::from_send_block(&land_send, &landing_account, &land_open.representative());
land_send.calc_hash().unwrap();
controller.add_elected_block(&land_send).await.unwrap();
assert_eq!(
controller.account_balance(&landing_account).await.unwrap(),
given
.checked_sub(&Rai::from(324518553658426726783156020576256))
.unwrap()
);
}
}