use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;
use structopt::StructOpt;
use async_std::stream;
use async_std::sync::channel;
use async_std::task;
use futures::prelude::*;
use futures::select;
use tracing::{span, Level};
use bytes::Bytes;
use dsf_core::net::Message as DsfMessage;
use dsf_core::service::{Publisher, ServiceBuilder};
use dsf_core::types::{Address, Id};
use dsf_rpc::{Request as RpcRequest, Response as RpcResponse};
use kad::Config as DhtConfig;
use crate::daemon::*;
use crate::error::Error;
use crate::io::*;
use crate::store::*;
use crate::daemon::Options as DaemonOptions;
pub struct Engine {
dsf: Dsf<WireConnector>,
unix: Unix,
wire: Option<Wire>,
net: Option<Net>,
options: Options,
}
pub const DEFAULT_UNIX_SOCKET: &str = "/tmp/dsf.sock";
pub const DEFAULT_DATABASE_FILE: &str = "/tmp/dsf.db";
pub const DEFAULT_SERVICE: &str = "/tmp/dsf.svc";
#[derive(StructOpt, Debug, Clone, PartialEq)]
pub struct Options {
#[structopt(short = "a", long = "bind-address", default_value = "0.0.0.0:10100")]
pub bind_addresses: Vec<SocketAddr>,
#[structopt(
long = "database-file",
default_value = "/var/dsfd/dsf.db",
env = "DSF_DB_FILE"
)]
pub database_file: String,
#[structopt(
short = "s",
long = "daemon-socket",
default_value = "/var/run/dsfd/dsf.sock",
env = "DSF_SOCK"
)]
pub daemon_socket: String,
#[structopt(long = "no-bootstrap")]
pub no_bootstrap: bool,
#[structopt(flatten)]
pub daemon_options: DaemonOptions,
}
impl Default for Options {
fn default() -> Self {
Self {
bind_addresses: vec![SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
10100,
)],
daemon_socket: DEFAULT_UNIX_SOCKET.to_string(),
database_file: DEFAULT_DATABASE_FILE.to_string(),
no_bootstrap: false,
daemon_options: DaemonOptions {
dht: DhtConfig::default(),
},
}
}
}
impl Options {
pub fn with_suffix(&self, suffix: usize) -> Self {
let bind_addresses = self
.bind_addresses
.iter()
.map(|a| {
let mut a = a.clone();
a.set_port(a.port() + suffix as u16);
a
})
.collect();
Self {
bind_addresses,
daemon_socket: format!("{}.{}", self.daemon_socket, suffix),
database_file: format!("{}.{}", self.database_file, suffix),
no_bootstrap: self.no_bootstrap,
daemon_options: DaemonOptions {
dht: self.daemon_options.dht.clone(),
},
}
}
}
impl Engine {
pub async fn new(options: Options) -> Result<Self, Error> {
let store = Store::new(&options.database_file)?;
let mut service = match store.load_peer_service()? {
Some(s) => {
info!("Loaded existing peer service: {}", s.id());
s
}
None => {
let s = ServiceBuilder::peer().build().unwrap();
info!("Created new peer service: {}", s.id());
s
}
};
let mut buff = vec![0u8; 1025];
let (n, mut page) = service.publish_primary(&mut buff)?;
page.raw = Some(buff[..n].to_vec());
store.set_peer_service(&service, &page)?;
let span = span!(Level::DEBUG, "engine", "{}", service.id());
let _enter = span.enter();
info!("Creating new engine");
info!(
"Creating network connector on addresses: {:?}",
options.bind_addresses
);
let mut net = Net::new();
for addr in &options.bind_addresses {
if let Err(e) = net.bind(NetKind::Udp, *addr).await {
error!("Error binding interface: {:?}", addr);
return Err(e.into());
}
}
info!("Creating unix socket: {}", options.daemon_socket);
let unix = match Unix::new(&options.daemon_socket).await {
Ok(u) => u,
Err(e) => {
error!("Error binding unix socket: {}", options.daemon_socket);
return Err(e.into());
}
};
let wire = Wire::new(service.private_key().unwrap());
let dsf = Dsf::new(
options.daemon_options.clone(),
service,
Arc::new(Mutex::new(store)),
wire.connector(),
)?;
debug!("Engine created!");
Ok(Self {
dsf,
wire: Some(wire),
net: Some(net),
unix,
options,
})
}
pub fn id(&self) -> Id {
self.dsf.id()
}
pub async fn run(&mut self, running: Arc<AtomicBool>) -> Result<(), Error> {
let span = span!(Level::DEBUG, "engine", "{}", self.dsf.id());
let _enter = span.enter();
if !self.options.no_bootstrap {
let mut d = self.dsf.clone();
task::spawn(async move {
task::sleep(Duration::from_secs(2)).await;
let _ = d.bootstrap().await;
});
}
let mut update_timer = stream::interval(Duration::from_secs(30));
let mut tick_timer = stream::interval(Duration::from_secs(1));
let (net_in_tx, mut net_in_rx) = channel(1000);
let (net_out_tx, mut net_out_rx) = channel(1000);
let (mut wire, mut net) = (self.wire.take().unwrap(), self.net.take().unwrap());
let r = running.clone();
let net_dsf = self.dsf.clone();
let _net_handle = task::spawn(async move {
while r.load(Ordering::SeqCst) {
select! {
net_rx = net.next().fuse() => {
trace!("engine::net_rx");
if let Some(m) = net_rx {
let address = m.address.clone();
let message = match wire.handle(m, |id| net_dsf.find_public_key(id)).await {
Ok(Some(v)) => v,
Ok(None) => continue,
Err(e) => {
error!("error decoding network message from: {:?}", address);
continue;
}
};
net_in_tx.send((address, message)).await;
}
},
net_tx = net_out_rx.next().fuse() => {
trace!("engine::net_tx");
if let Some((address, message)) = net_tx {
let net_tx = match wire.handle_outgoing(Address::from(address), message) {
Ok(v) => v,
Err(e) => {
error!("error encoding network message: {:?}", e);
continue;
}
};
if let Err(e) = net.send(net_tx).await {
error!("error sending network message: {:?}", e);
}
}
}
net_tx = wire.next().fuse() => {
trace!("engine::wire_tx");
if let Some(m) = net_tx {
net.send(m).await.unwrap();
}
}
}
}
warn!("Exiting network handler");
});
while running.load(Ordering::SeqCst) {
#[cfg(feature = "profile")]
let _fg = ::flame::start_guard("engine::tick");
select! {
net_rx = net_in_rx.next().fuse() => {
if let Some((address, req)) = net_rx {
let mut dsf = self.dsf.clone();
let resp = match dsf.handle(address, req).await {
Ok(v) => v,
Err(e) => {
error!("error handling DSF request: {:?}", e);
continue;
}
};
net_out_tx.send((address, DsfMessage::Response(resp))).await;
}
},
rpc_rx = self.unix.next().fuse() => {
trace!("engine::unix_rx");
if let Some(m) = rpc_rx {
let mut dsf = self.dsf.clone();
task::spawn(async move {
Self::handle_rpc(&mut dsf, m).await.unwrap();
});
}
},
interval = update_timer.next().fuse() => {
trace!("engine::tick");
if let Some(_i) = interval {
}
},
tick = tick_timer.next().fuse() => {},
}
}
Ok(())
}
async fn handle_rpc(dsf: &mut Dsf<WireConnector>, unix_req: UnixMessage) -> Result<(), Error> {
let req: RpcRequest = serde_json::from_slice(&unix_req.data).unwrap();
let span = span!(Level::TRACE, "rpc", id = req.req_id());
let _enter = span.enter();
let resp_kind = dsf.exec(req.kind()).await?;
let resp = RpcResponse::new(req.req_id(), resp_kind);
let enc = serde_json::to_vec(&resp).unwrap();
let unix_resp = unix_req.response(Bytes::from(enc));
if let Err(e) = unix_resp.send().await {
error!("Error sending RPC response: {:?}", e);
}
Ok(())
}
}