#[macro_use]
extern crate clap;
extern crate env_logger;
extern crate failure;
extern crate futures;
extern crate hex;
extern crate itertools;
#[macro_use]
extern crate log;
#[cfg(unix)]
extern crate syslog;
extern crate tokio;
extern crate tokio_codec;
extern crate tox;
mod cli_config;
use std::fs::{File, OpenOptions};
use std::io::{Error, ErrorKind, Read, Write};
use std::net::{IpAddr, SocketAddr};
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
use futures::sync::mpsc;
use futures::{future, Future, Sink, Stream};
use futures::future::Either;
use itertools::Itertools;
use log::LevelFilter;
use tokio::executor::thread_pool;
use tokio::net::{TcpListener, UdpSocket, UdpFramed};
use tokio::runtime;
use tox::toxcore::crypto_core::*;
use tox::toxcore::dht::codec::{DecodeError, DhtCodec};
use tox::toxcore::dht::server::{Server as UdpServer};
use tox::toxcore::dht::lan_discovery::LanDiscoverySender;
use tox::toxcore::onion::packet::InnerOnionResponse;
use tox::toxcore::tcp::packet::OnionRequest;
use tox::toxcore::tcp::server::{Server as TcpServer, ServerExt};
#[cfg(unix)]
use syslog::Facility;
use cli_config::*;
fn bind_socket(addr: SocketAddr) -> UdpSocket {
let socket = UdpSocket::bind(&addr).expect("Failed to bind UDP socket");
socket.set_broadcast(true).expect("set_broadcast call failed");
if addr.is_ipv6() {
socket.set_multicast_loop_v6(true).expect("set_multicast_loop_v6 call failed");
}
socket
}
fn save_keys(keys_file: &str, pk: PublicKey, sk: &SecretKey) {
#[cfg(not(unix))]
let mut file = File::create(keys_file).expect("Failed to create the keys file");
#[cfg(unix)]
let mut file = OpenOptions::new()
.create(true)
.write(true)
.mode(0o600)
.open(keys_file)
.expect("Failed to create the keys file");
file.write_all(pk.as_ref()).expect("Failed to save public key to the keys file");
file.write_all(&sk[0..SECRETKEYBYTES]).expect("Failed to save secret key to the keys file");
}
fn load_keys(mut file: File) -> (PublicKey, SecretKey) {
let mut buf = [0; PUBLICKEYBYTES + SECRETKEYBYTES];
file.read_exact(&mut buf).expect("Failed to read keys from the keys file");
let pk = PublicKey::from_slice(&buf[..PUBLICKEYBYTES]).expect("Failed to read public key from the keys file");
let sk = SecretKey::from_slice(&buf[PUBLICKEYBYTES..]).expect("Failed to read secret key from the keys file");
(pk, sk)
}
fn load_or_gen_keys(keys_file: &str) -> (PublicKey, SecretKey) {
match File::open(keys_file) {
Ok(file) => load_keys(file),
Err(ref e) if e.kind() == ErrorKind::NotFound => {
info!("Generating new DHT keys and storing them to '{}'", keys_file);
let (pk, sk) = gen_keypair();
save_keys(keys_file, pk, &sk);
(pk, sk)
},
Err(e) => panic!("Failed to read the keys file: {}", e)
}
}
fn run<F>(future: F, threads_config: ThreadsConfig)
where F: Future<Item = (), Error = Error> + Send + 'static
{
if threads_config == ThreadsConfig::N(1) {
let mut runtime = runtime::current_thread::Runtime::new().expect("Failed to create runtime");
runtime.block_on(future).expect("Execution was terminated with error");
} else {
let mut threadpool_builder = thread_pool::Builder::new();
threadpool_builder.name_prefix("tox-node-");
match threads_config {
ThreadsConfig::N(n) => { threadpool_builder.pool_size(n as usize); },
ThreadsConfig::Auto => { }, }
let mut runtime = runtime::Builder::new()
.threadpool_builder(threadpool_builder)
.build()
.expect("Failed to create runtime");
runtime.block_on(future).expect("Execution was terminated with error");
};
}
struct TcpOnion {
tx: mpsc::UnboundedSender<(OnionRequest, SocketAddr)>,
rx: mpsc::UnboundedReceiver<(InnerOnionResponse, SocketAddr)>,
}
struct UdpOnion {
tx: mpsc::UnboundedSender<(InnerOnionResponse, SocketAddr)>,
rx: mpsc::UnboundedReceiver<(OnionRequest, SocketAddr)>,
}
fn create_onion_streams() -> (TcpOnion, UdpOnion) {
let (udp_onion_tx, udp_onion_rx) = mpsc::unbounded();
let (tcp_onion_tx, tcp_onion_rx) = mpsc::unbounded();
let tcp_onion = TcpOnion {
tx: tcp_onion_tx,
rx: udp_onion_rx,
};
let udp_onion = UdpOnion {
tx: udp_onion_tx,
rx: tcp_onion_rx,
};
(tcp_onion, udp_onion)
}
fn run_tcp(cli_config: &CliConfig, dht_sk: SecretKey, tcp_onion: TcpOnion) -> impl Future<Item = (), Error = Error> {
if cli_config.tcp_addrs.is_empty() {
let tcp_onion_future = tcp_onion.rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(|_| future::ok(()));
return Either::A(tcp_onion_future)
}
let mut tcp_server = TcpServer::new();
tcp_server.set_udp_onion_sink(tcp_onion.tx);
let tcp_server_c = tcp_server.clone();
let tcp_server_futures = cli_config.tcp_addrs.iter().map(move |&addr| {
let tcp_server_c = tcp_server_c.clone();
let dht_sk = dht_sk.clone();
let listener = TcpListener::bind(&addr).expect("Failed to bind TCP listener");
tcp_server_c.run(listener, dht_sk)
});
let tcp_server_future = future::select_all(tcp_server_futures)
.map(|_| ())
.map_err(|(e, _, _)| e);
let tcp_onion_future = tcp_onion.rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(move |(onion_response, addr)|
tcp_server.handle_udp_onion_response(addr.ip(), addr.port(), onion_response)
);
info!("Running TCP relay on {}", cli_config.tcp_addrs.iter().format(","));
Either::B(tcp_server_future
.join(tcp_onion_future)
.map(|_| ()))
}
fn run_udp(cli_config: &CliConfig, dht_pk: PublicKey, dht_sk: &SecretKey, udp_onion: UdpOnion) -> impl Future<Item = (), Error = Error> {
let udp_addr = if let Some(udp_addr) = cli_config.udp_addr {
udp_addr
} else {
let udp_onion_future = udp_onion.rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(|_| future::ok(()));
return Either::A(udp_onion_future)
};
let socket = bind_socket(udp_addr);
let (sink, stream) = UdpFramed::new(socket, DhtCodec).split();
let (tx, rx) = mpsc::unbounded();
let lan_discovery_future = if cli_config.lan_discovery_enabled {
LanDiscoverySender::new(tx.clone(), dht_pk, udp_addr.is_ipv6()).run()
} else {
Box::new(future::empty())
};
let mut server = UdpServer::new(tx, dht_pk, dht_sk.clone());
server.set_bootstrap_info(07032018, cli_config.motd.as_bytes().to_owned());
server.enable_lan_discovery(cli_config.lan_discovery_enabled);
server.set_tcp_onion_sink(udp_onion.tx);
server.enable_ipv6_mode(udp_addr.is_ipv6());
let server_c = server.clone();
let udp_onion_future = udp_onion.rx
.map_err(|()| Error::from(ErrorKind::UnexpectedEof))
.for_each(move |(onion_request, addr)|
server_c.handle_tcp_onion_request(onion_request, addr)
);
if cli_config.bootstrap_nodes.is_empty() {
warn!("No bootstrap nodes!");
}
for &node in &cli_config.bootstrap_nodes {
server.add_initial_bootstrap(node);
}
let server_c = server.clone();
let network_reader = stream.then(future::ok).filter(|event| match event {
Ok(_) => true,
Err(ref e) => {
error!("packet receive error = {:?}", e);
e.as_fail().downcast_ref::<DecodeError>().is_none()
}
}
).then(|event: Result<_, ()>|
event.expect("always ok")
).for_each(move |(packet, addr)| {
trace!("Received packet {:?}", packet);
server_c.handle_packet(packet, addr).or_else(|err| {
error!("Failed to handle packet: {:?}", err);
future::ok(())
})
}).map_err(|e| Error::new(ErrorKind::Other, e.compat()));
let network_writer = rx
.map_err(|()| Error::new(ErrorKind::Other, "rx error"))
.filter(move |&(ref _packet, addr)| !(udp_addr.is_ipv4() && addr.is_ipv6()))
.fold(sink, move |sink, (packet, mut addr)| {
if udp_addr.is_ipv6() {
if let IpAddr::V4(ip) = addr.ip() {
addr = SocketAddr::new(IpAddr::V6(ip.to_ipv6_mapped()), addr.port());
}
}
trace!("Sending packet {:?} to {:?}", packet, addr);
sink.send((packet, addr)).map_err(|e| Error::new(ErrorKind::Other, e.compat()))
})
.map(|_sink| ());
info!("Running DHT server on {}", udp_addr);
Either::B(network_reader
.select(network_writer).map(|_| ()).map_err(|(e, _)| e)
.select(server.run()).map(|_| ()).map_err(|(e, _)| e)
.select(lan_discovery_future).map(|_| ()).map_err(|(e, _)| e)
.join(udp_onion_future).map(|_| ()))
}
fn main() {
if !crypto_init() {
panic!("Crypto initialization failed.");
}
let cli_config = cli_parse();
match cli_config.log_type {
LogType::Stderr => {
env_logger::Builder::from_default_env()
.filter_level(LevelFilter::Info)
.init();
},
LogType::Stdout => {
env_logger::Builder::from_default_env()
.target(env_logger::fmt::Target::Stdout)
.filter_level(LevelFilter::Info)
.init();
},
#[cfg(unix)]
LogType::Syslog => {
syslog::init(Facility::LOG_USER, LevelFilter::Info, None)
.expect("Failed to initialize syslog backend.");
},
LogType::None => { },
}
let (dht_pk, dht_sk) = if let Some(ref sk) = cli_config.sk {
(sk.public_key(), sk.clone())
} else if let Some(ref keys_file) = cli_config.keys_file {
load_or_gen_keys(keys_file)
} else {
panic!("Neither secret key nor keys file is specified")
};
info!("DHT public key: {}", hex::encode(dht_pk.as_ref()).to_uppercase());
let (tcp_onion, udp_onion) = create_onion_streams();
let udp_server_future = run_udp(&cli_config, dht_pk, &dht_sk, udp_onion);
let tcp_server_future = run_tcp(&cli_config, dht_sk, tcp_onion);
let future = udp_server_future.select(tcp_server_future).map(|_| ()).map_err(|(e, _)| e);
run(future, cli_config.threads_config);
}