#![recursion_limit = "128"]
#![forbid(
exceeding_bitshifts,
mutable_transmutes,
no_mangle_const_items,
unknown_crate_types,
warnings
)]
#![deny(
bad_style,
deprecated,
improper_ctypes,
missing_docs,
non_shorthand_field_patterns,
overflowing_literals,
plugin_as_library,
stable_features,
unconditional_recursion,
unknown_lints,
unsafe_code,
unused,
unused_allocation,
unused_attributes,
unused_comparisons,
unused_features,
unused_parens,
while_true,
clippy::unicode_not_nfc,
clippy::wrong_pub_self_convention,
clippy::option_unwrap_used
)]
#![warn(
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results
)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate quick_error;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate structopt;
#[macro_use]
extern crate unwrap;
pub use config::{Config, OurType, SerialisableCertificate};
pub use error::Error;
pub use event::Event;
pub use peer::{NodeInfo, Peer};
pub use peer_config::{DEFAULT_IDLE_TIMEOUT_MSEC, DEFAULT_KEEP_ALIVE_INTERVAL_MSEC};
pub use utils::{Token, R};
use crate::wire_msg::WireMsg;
use bootstrap_cache::BootstrapCache;
use context::{ctx, ctx_mut, initialise_ctx, Context};
use crossbeam_channel as mpmc;
use event_loop::EventLoop;
use std::collections::VecDeque;
use std::mem;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket};
use std::sync::mpsc;
use tokio::prelude::Future;
use tokio::runtime::current_thread;
mod bootstrap;
mod bootstrap_cache;
mod communicate;
mod config;
mod connect;
mod connection;
mod context;
mod dirs;
mod error;
mod event;
mod event_loop;
mod listener;
mod peer;
mod peer_config;
#[cfg(test)]
mod test_utils;
mod utils;
mod wire_msg;
pub const DEFAULT_MAX_ALLOWED_MSG_SIZE: usize = 500 * 1024 * 1024;
pub const DEFAULT_PORT_TO_TRY: u16 = 443;
pub struct Builder {
event_tx: mpmc::Sender<Event>,
cfg: Option<Config>,
proxies: VecDeque<NodeInfo>,
use_proxies_exclusively: bool,
}
impl Builder {
pub fn new(event_tx: mpmc::Sender<Event>) -> Self {
Self {
event_tx,
cfg: Default::default(),
proxies: Default::default(),
use_proxies_exclusively: Default::default(),
}
}
pub fn with_proxies(mut self, proxies: VecDeque<NodeInfo>, use_exclusively: bool) -> Self {
self.use_proxies_exclusively = use_exclusively;
if use_exclusively {
self.proxies = proxies;
} else {
self.proxies.extend(proxies.into_iter());
}
self
}
pub fn with_config(mut self, cfg: Config) -> Self {
self.cfg = Some(cfg);
self
}
pub fn build(self) -> R<QuicP2p> {
let mut qp2p = if let Some(cfg) = self.cfg {
QuicP2p::with_config(self.event_tx, cfg)
} else {
QuicP2p::new(self.event_tx)?
};
qp2p.activate()?;
let use_proxies_exclusively = self.use_proxies_exclusively;
let proxies = self.proxies;
qp2p.el.post(move || {
ctx_mut(|c| {
if use_proxies_exclusively {
let _ = mem::replace(c.bootstrap_cache.peers_mut(), proxies);
} else {
c.bootstrap_cache.peers_mut().extend(proxies.into_iter());
}
})
});
Ok(qp2p)
}
}
pub struct QuicP2p {
event_tx: mpmc::Sender<Event>,
cfg: Config,
us: Option<NodeInfo>,
el: EventLoop,
}
impl QuicP2p {
pub fn bootstrap(&mut self) {
self.el.post(|| {
bootstrap::start();
})
}
pub fn connect_to(&mut self, peer_info: NodeInfo) {
self.el.post(move || {
let peer_addr = peer_info.peer_addr;
if let Err(e) = connect::connect_to(peer_info, None, None) {
info!("Could not connect to the asked peer: {}", e);
ctx_mut(|c| {
let _ = c
.event_tx
.send(Event::ConnectionFailure { peer_addr, err: e });
});
} else {
Self::set_we_contacted_peer(&peer_addr);
}
});
}
pub fn disconnect_from(&mut self, peer_addr: SocketAddr) {
self.el.post(move || {
ctx_mut(|c| {
if c.connections.remove(&peer_addr).is_none() {
debug!("Asked to disconnect from an unknown peer");
}
})
});
}
pub fn send(&mut self, peer: Peer, msg: bytes::Bytes, token: Token) {
self.el.post(move || {
let peer_addr = peer.peer_addr();
communicate::try_write_to_peer(peer, WireMsg::UserMsg(msg), token);
Self::set_we_contacted_peer(&peer_addr);
});
}
pub fn our_connection_info(&mut self) -> R<NodeInfo> {
if let Some(ref us) = self.us {
return Ok(us.clone());
}
let our_addr = match self.query_ip_echo_service() {
Ok(addr) => addr,
Err(e @ Error::NoEndpointEchoServerFound) => {
let (tx, rx) = mpsc::channel();
self.el.post(move || {
let local_addr_res = ctx(|c| c.quic_ep().local_addr());
unwrap!(tx.send(local_addr_res));
});
let addr = unwrap!(rx.recv())?;
if addr.ip().is_unspecified() {
return Err(e);
} else {
addr
}
}
Err(e) => return Err(e),
};
let our_cert_der = self.our_certificate_der();
let us = NodeInfo {
peer_addr: our_addr,
peer_cert_der: our_cert_der,
};
self.us = Some(us.clone());
Ok(us)
}
pub fn bootstrap_cache(&mut self) -> R<Vec<NodeInfo>> {
let (tx, rx) = mpsc::channel();
self.el.post(move || {
let cache = ctx(|c| c.bootstrap_cache.peers().iter().cloned().collect());
let _ = tx.send(cache);
});
let cache = rx.recv()?;
Ok(cache)
}
pub fn is_hard_coded_contact(&self, node_info: &NodeInfo) -> bool {
self.cfg.hard_coded_contacts.contains(node_info)
}
fn new(event_tx: mpmc::Sender<Event>) -> R<Self> {
Ok(Self::with_config(
event_tx,
Config::read_or_construct_default(None)?,
))
}
fn with_config(event_tx: mpmc::Sender<Event>, cfg: Config) -> Self {
let el = EventLoop::spawn();
Self {
event_tx,
cfg,
us: None,
el,
}
}
fn activate(&mut self) -> R<()> {
let (port, is_user_supplied) = self
.cfg
.port
.map(|p| (p, true))
.unwrap_or((DEFAULT_PORT_TO_TRY, false));
let ip = self
.cfg
.ip
.unwrap_or_else(|| IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let max_msg_size_allowed = self
.cfg
.max_msg_size_allowed
.map(|size| size as usize)
.unwrap_or(DEFAULT_MAX_ALLOWED_MSG_SIZE);
let idle_timeout_msec = self
.cfg
.idle_timeout_msec
.unwrap_or(DEFAULT_IDLE_TIMEOUT_MSEC);
let keep_alive_interval_msec = self
.cfg
.keep_alive_interval_msec
.unwrap_or(DEFAULT_KEEP_ALIVE_INTERVAL_MSEC);
let our_type = self.cfg.our_type;
let hard_coded_contacts = self.cfg.hard_coded_contacts.clone();
let tx = self.event_tx.clone();
let ((key, cert), our_complete_cert) = {
let our_complete_cert = self
.cfg
.our_complete_cert
.clone()
.unwrap_or_else(Default::default);
(
our_complete_cert.obtain_priv_key_and_cert()?,
our_complete_cert,
)
};
let bootstrap_cache = BootstrapCache::new(hard_coded_contacts, None)?;
self.el.post(move || {
let our_cfg = unwrap!(peer_config::new_our_cfg(
idle_timeout_msec,
keep_alive_interval_msec,
cert,
key
));
let mut ep_builder = quinn::Endpoint::builder();
let _ = ep_builder.listen(our_cfg);
let (dr, ep, incoming_connections) = {
match UdpSocket::bind(&(ip, port)) {
Ok(udp) => unwrap!(ep_builder.with_socket(udp)),
Err(e) => {
if is_user_supplied {
panic!(
"Could not bind to the user supplied port: {}! Error: {:?}- {}",
port, e, e
);
}
info!(
"Failed to bind to port: {} - Error: {:?} - {}. Trying random port.",
DEFAULT_PORT_TO_TRY, e, e
);
unwrap!(ep_builder.bind(&(ip, 0)))
}
}
};
let ctx = Context::new(
tx,
our_complete_cert,
max_msg_size_allowed,
idle_timeout_msec,
keep_alive_interval_msec,
our_type,
bootstrap_cache,
ep,
);
initialise_ctx(ctx);
current_thread::spawn(dr.map_err(|e| warn!("Error in quinn Driver: {:?}", e)));
if our_type != OurType::Client {
listener::listen(incoming_connections);
}
});
Ok(())
}
fn our_certificate_der(&mut self) -> Vec<u8> {
let (tx, rx) = mpsc::channel();
self.el.post(move || {
let our_cert_der = ctx(|c| c.our_complete_cert.cert_der.clone());
unwrap!(tx.send(our_cert_der));
});
unwrap!(rx.recv())
}
fn query_ip_echo_service(&mut self) -> R<SocketAddr> {
let node_info = if let Some(node_info) = self.cfg.hard_coded_contacts.iter().next() {
node_info.clone()
} else {
return Err(Error::NoEndpointEchoServerFound);
};
let echo_server = Peer::Node { node_info };
let (tx, rx) = mpsc::channel();
self.el.post(move || {
ctx_mut(|c| c.our_ext_addr_tx = Some(tx));
communicate::try_write_to_peer(echo_server, WireMsg::EndpointEchoReq, 0)
});
Ok(unwrap!(rx.recv()))
}
#[inline]
fn set_we_contacted_peer(peer_addr: &SocketAddr) {
ctx_mut(|c| {
if let Some(conn) = c.connections.get_mut(peer_addr) {
conn.we_contacted_peer = true;
}
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wire_msg::{Handshake, WireMsg};
use crossbeam_channel as mpmc;
use std::collections::HashSet;
use std::iter;
use std::time::Duration;
use test_utils::{new_random_qp2p, rand_node_info};
#[test]
fn dropping_qp2p_handle_gracefully_shutsdown_event_loop() {
let (tx, _rx) = mpmc::unbounded();
let _qp2p = unwrap!(Builder::new(tx).build());
}
#[test]
fn echo_service() {
let (mut qp2p0, _rx) = new_random_qp2p(false, Default::default());
match qp2p0.query_ip_echo_service() {
Ok(_) => panic!("Without Hard Coded Contacts, echo service should not be possible"),
Err(Error::NoEndpointEchoServerFound) => (),
Err(e) => panic!("{:?} - {}", e, e),
}
let qp2p0_info = unwrap!(qp2p0.our_connection_info());
let qp2p0_port = qp2p0_info.peer_addr.port();
let (mut qp2p1, rx1) = {
let mut hcc: HashSet<_> = Default::default();
assert!(hcc.insert(qp2p0_info.clone()));
new_random_qp2p(true, hcc)
};
let qp2p1_port = unwrap!(qp2p1.query_ip_echo_service()).port();
let qp2p1_info = unwrap!(qp2p1.our_connection_info());
assert_ne!(qp2p0_port, qp2p1_port);
assert_eq!(qp2p1_port, qp2p1_info.peer_addr.port());
let (mut qp2p2, rx2) = {
let mut hcc: HashSet<_> = Default::default();
assert!(hcc.insert(qp2p0_info.clone()));
new_random_qp2p(true, hcc)
};
let qp2p2_info = unwrap!(qp2p2.our_connection_info());
while let Ok(_) = rx1.try_recv() {}
while let Ok(_) = rx2.try_recv() {}
let data = bytes::Bytes::from(vec![12, 13, 14, 253]);
const TOKEN: u64 = 19923;
qp2p2.send(qp2p1_info.clone().into(), data.clone(), TOKEN);
match unwrap!(rx1.recv()) {
Event::ConnectedTo { peer } => assert_eq!(
peer,
Peer::Node {
node_info: qp2p2_info.clone()
}
),
x => panic!("Received unexpected event: {:?}", x),
}
match unwrap!(rx1.recv()) {
Event::NewMessage { peer_addr, msg } => {
assert_eq!(peer_addr, qp2p2_info.peer_addr);
assert_eq!(msg, data);
}
x => panic!("Received unexpected event: {:?}", x),
}
match unwrap!(rx2.recv()) {
Event::SentUserMessage {
peer_addr,
msg,
token,
} => {
assert_eq!(peer_addr, qp2p1_info.peer_addr);
assert_eq!(msg, data);
assert_eq!(token, TOKEN);
}
x => panic!("Received unexpected event: {:?}", x),
}
}
#[test]
fn multistreaming_and_no_head_of_queue_blocking() {
const TEST_TOKEN0: u64 = 293_203;
const TEST_TOKEN1: u64 = 3_435_235;
let (mut qp2p0, rx0) = new_random_qp2p(false, Default::default());
let qp2p0_info = unwrap!(qp2p0.our_connection_info());
let (mut qp2p1, rx1) = {
let mut hcc: HashSet<_> = Default::default();
assert!(hcc.insert(qp2p0_info.clone()));
new_random_qp2p(true, hcc)
};
let qp2p1_info = unwrap!(qp2p1.our_connection_info());
let qp2p0_addr = qp2p0_info.peer_addr;
let qp2p1_addr = qp2p1_info.peer_addr;
let big_msg_to_qp2p0 = bytes::Bytes::from(vec![255; 400 * 1024 * 1024]);
let big_msg_to_qp2p0_clone = big_msg_to_qp2p0.clone();
let small_msg0_to_qp2p0 = bytes::Bytes::from(vec![255, 254, 253, 252]);
let small_msg0_to_qp2p0_clone = small_msg0_to_qp2p0.clone();
let small_msg1_to_qp2p0 = bytes::Bytes::from(vec![155, 154, 153, 152]);
let small_msg1_to_qp2p0_clone = small_msg1_to_qp2p0.clone();
let msg_to_qp2p1 = bytes::Bytes::from(vec![120, 129, 2]);
let msg_to_qp2p1_clone0 = msg_to_qp2p1.clone();
let msg_to_qp2p1_clone1 = msg_to_qp2p1.clone();
let j0 = unwrap!(std::thread::Builder::new()
.name("QuicP2p0-test-thread".to_string())
.spawn(move || {
match rx0.recv() {
Ok(Event::ConnectedTo {
peer: Peer::Node { node_info },
}) => assert_eq!(node_info.peer_addr, qp2p1_addr),
Ok(x) => panic!("Expected Event::ConnectedTo - got {:?}", x),
Err(e) => panic!(
"QuicP2p0 Expected Event::ConnectedTo; got error: {:?} {}",
e, e
),
};
let mut rxd_sent_msg_event = false;
for i in 0..4 {
match rx0.recv() {
Ok(Event::NewMessage { peer_addr, msg }) => {
assert_eq!(peer_addr, qp2p1_addr);
if i != 3 {
assert!(
msg == small_msg0_to_qp2p0_clone
|| msg == small_msg1_to_qp2p0_clone
);
info!("Smaller message {:?} rxd from {}", &*msg, peer_addr)
} else {
assert_eq!(msg, big_msg_to_qp2p0_clone);
info!("Big message of size {} rxd from {}", msg.len(), peer_addr);
}
}
Ok(Event::SentUserMessage {
peer_addr,
msg,
token,
}) => {
if rxd_sent_msg_event {
panic!("Should have received sent message event only once !");
}
assert_eq!(peer_addr, qp2p1_addr);
assert_eq!(msg, msg_to_qp2p1_clone0);
assert_eq!(token, TEST_TOKEN0);
rxd_sent_msg_event = true;
}
Ok(x) => panic!("Expected Event::NewMessage - got {:?}", x),
Err(e) => panic!(
"QuicP2p0 Expected Event::NewMessage; got error: {:?} {}",
e, e
),
};
}
}));
let j1 = unwrap!(std::thread::Builder::new()
.name("QuicP2p1-test-thread".to_string())
.spawn(move || {
match rx1.recv() {
Ok(Event::ConnectedTo {
peer: Peer::Node { node_info },
}) => assert_eq!(node_info.peer_addr, qp2p0_addr),
Ok(x) => panic!("Expected Event::ConnectedTo - got {:?}", x),
Err(e) => panic!(
"QuicP2p1 Expected Event::ConnectedTo; got error: {:?} {}",
e, e
),
};
let mut count_of_rxd_sent_msgs: u8 = 0;
for _ in 0..4 {
match rx1.recv() {
Ok(Event::NewMessage { peer_addr, msg }) => {
assert_eq!(peer_addr, qp2p0_addr);
assert_eq!(msg, msg_to_qp2p1_clone1);
}
Ok(Event::SentUserMessage { peer_addr, token, .. }) => {
if count_of_rxd_sent_msgs >=3 {
panic!("Only sent 3 msgs, so cannot rx send success for more than those
!");
}
assert_eq!(peer_addr, qp2p0_addr);
assert_eq!(token, TEST_TOKEN1);
count_of_rxd_sent_msgs += 1;
}
Ok(x) => panic!("Expected Event::NewMessage - got {:?}", x),
Err(e) => panic!(
"QuicP2p1 Expected Event::NewMessage; got error: {:?} {}",
e, e
),
};
}
}));
qp2p1.send(qp2p0_info.clone().into(), big_msg_to_qp2p0, TEST_TOKEN1);
qp2p1.send(qp2p0_info.clone().into(), small_msg0_to_qp2p0, TEST_TOKEN1);
std::thread::sleep(Duration::from_millis(100));
qp2p1.send(qp2p0_info.into(), small_msg1_to_qp2p0, TEST_TOKEN1);
qp2p0.send(qp2p1_info.into(), msg_to_qp2p1, TEST_TOKEN0);
unwrap!(j0.join());
unwrap!(j1.join());
}
#[test]
fn double_handshake_node() {
let (mut qp2p0, rx0) = new_random_qp2p(false, Default::default());
let qp2p0_info = unwrap!(qp2p0.our_connection_info());
let (tx1, rx1) = mpmc::unbounded();
let mut malicious_client = unwrap!(Builder::new(tx1)
.with_config(Config {
our_type: OurType::Node,
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Default::default()
})
.build());
malicious_client.send_wire_msg(
qp2p0_info.clone().into(),
WireMsg::Handshake(Handshake::Client),
0,
);
let malicious_client_info = unwrap!(malicious_client.our_connection_info());
match rx0.recv() {
Ok(Event::ConnectedTo {
peer: Peer::Node { node_info },
}) => {
assert_eq!(node_info.peer_addr, malicious_client_info.peer_addr);
}
r => panic!("Unexpected result {:?}", r),
}
match rx1.recv() {
Ok(Event::ConnectedTo {
peer: Peer::Node { node_info },
}) => {
assert_eq!(node_info.peer_addr, qp2p0_info.peer_addr);
}
r => panic!("Unexpected result {:?}", r),
}
match rx0.try_recv() {
Err(mpmc::TryRecvError::Empty) => {}
r => panic!("Unexpected result {:?}", r),
}
match rx1.try_recv() {
Err(mpmc::TryRecvError::Empty) => {}
r => panic!("Unexpected result {:?}", r),
}
let from_peer_is_established = unwrap!(malicious_client
.connections(move |c| { c[&qp2p0_info.peer_addr].from_peer.is_established() }));
let to_peer_is_established = unwrap!(qp2p0.connections(move |c| {
c[&malicious_client_info.peer_addr].to_peer.is_established()
}));
assert!(from_peer_is_established && to_peer_is_established);
}
#[test]
fn double_handshake_client() {
let (mut qp2p0, rx0) = new_random_qp2p(false, Default::default());
let qp2p0_info = unwrap!(qp2p0.our_connection_info());
let (tx1, _rx1) = mpmc::unbounded();
let mut malicious_client = unwrap!(Builder::new(tx1)
.with_config(Config {
our_type: OurType::Client,
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
..Default::default()
})
.build());
malicious_client.send_wire_msg(
qp2p0_info.clone().into(),
WireMsg::Handshake(Handshake::Node { cert_der: vec![] }),
0,
);
let malicious_client_info = unwrap!(malicious_client.our_connection_info());
match rx0.recv() {
Ok(Event::ConnectedTo {
peer: Peer::Client { .. },
}) => {}
r => panic!("Unexpected result {:?}", r),
}
match rx0.try_recv() {
Err(mpmc::TryRecvError::Empty) => {}
r => panic!("Unexpected result {:?}", r),
}
let from_peer_is_not_needed = unwrap!(malicious_client
.connections(move |c| { c[&qp2p0_info.peer_addr].from_peer.is_not_needed() }));
let to_peer_is_not_needed = unwrap!(qp2p0
.connections(move |c| { c[&malicious_client_info.peer_addr].to_peer.is_not_needed() }));
assert!(from_peer_is_not_needed && to_peer_is_not_needed);
}
#[test]
fn connect_to_fails() {
let invalid_socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 1);
let (mut peer, rx) = new_random_qp2p(false, Default::default());
peer.connect_to(NodeInfo {
peer_addr: invalid_socket_addr,
peer_cert_der: Default::default(),
});
match rx.recv() {
Ok(Event::ConnectionFailure { peer_addr, .. }) => {
assert_eq!(peer_addr, invalid_socket_addr);
}
r => panic!("Unexpected result {:?}", r),
}
}
#[test]
fn connect_to_marks_that_we_attempted_to_contact_the_peer() {
let (mut peer1, _) = new_random_qp2p(false, Default::default());
let peer1_conn_info = unwrap!(peer1.our_connection_info());
let peer1_addr = peer1_conn_info.peer_addr;
let (mut peer2, ev_rx) = new_random_qp2p(false, Default::default());
peer2.connect_to(peer1_conn_info);
for event in ev_rx.iter() {
if let Event::ConnectedTo { .. } = event {
break;
}
}
let (tx, rx) = mpsc::channel();
peer2.el.post(move || {
let contacted = ctx(|c| unwrap!(c.connections.get(&peer1_addr)).we_contacted_peer);
let _ = tx.send(contacted);
});
let we_contacted_peer = unwrap!(rx.recv());
assert!(we_contacted_peer);
}
#[test]
fn is_hard_coded_contact() {
let contact0 = rand_node_info();
let contact1 = rand_node_info();
let (peer, _) = new_random_qp2p(false, iter::once(contact0.clone()).collect());
assert!(peer.is_hard_coded_contact(&contact0));
assert!(!peer.is_hard_coded_contact(&contact1));
}
}