#![allow(clippy::style)]
#![allow(clippy::unit_arg)]
#![allow(clippy::useless_format)]
#![allow(clippy::while_let_loop)]
use hippotat::prelude::*;
mod daemon;
mod suser;
mod slocal;
mod sweb;
pub use daemon::Daemoniser;
pub use sweb::{WebRequest, WebResponse, WebResponseBody};
pub use suser::User;
#[derive(clap::Parser,Debug)]
pub struct Opts {
#[clap(flatten)]
pub log: LogOpts,
#[clap(flatten)]
pub config: config::Opts,
#[clap(long)]
daemon: bool,
#[clap(long)]
pidfile: Option<String>,
#[clap(long)]
print_config: Option<String>,
}
pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
pub const MAXQUEUE_ROUTE2USER: usize = 15;
pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
pub const MAXQUEUE_WEBREQ2USER: usize = 5;
#[derive(Debug)]
pub struct Global {
config: config::InstanceConfigGlobal,
local_rx: mpsc::Sender<RoutedPacket>,
all_clients: HashMap<ClientName, User>,
}
pub struct RoutedPacket {
pub data: RoutedPacketData,
}
pub type RoutedPacketData = Box<[u8]>;
mod may_route {
#[derive(Clone,Debug)]
pub struct MayRoute(());
impl MayRoute {
pub fn came_from_outside_hippotatd() -> Self { Self(()) }
}
}
pub use may_route::MayRoute;
pub async fn route_packet(global: &Global,
transport_conn: &str, source: Option<&ClientName>,
packet: RoutedPacketData, daddr: IpAddr,
_may_route: MayRoute)
{
let c = &global.config;
let len = packet.len();
let trace = |how: &str, why: &str| {
trace!("{} to={:?} came=={} user={} len={} {}",
how, daddr, transport_conn,
match source {
Some(s) => s as &dyn Display,
None => &"local",
},
len, why);
};
let (dest, why) =
if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
(Some(&global.local_rx), "via=local")
} else if daddr == c.vrelay {
(None, " vrelay")
} else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
(Some(&client.route), "via=client")
} else {
(None, "no-client")
};
let dest = if let Some(d) = dest { d } else {
trace("discard ", why); return;
};
let packet = RoutedPacket {
data: packet,
};
match dest.send(packet).await {
Ok(()) => trace("forward", why),
Err(_) => trace("task-crashed!", why),
}
}
fn main() {
let opts = <Opts as clap::Parser>::parse();
let daemon = if opts.daemon && opts.print_config.is_none() {
Some(Daemoniser::phase1())
} else {
None
};
async_main(opts, daemon);
}
#[tokio::main]
async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
let mut tasks: Vec<(
JoinHandle<AE>,
String,
)> = vec![];
config::startup(
"hippotatd", LinkEnd::Server,
&opts.config, &opts.log, |server_name, ics|
{
let server_name = server_name.expect("LinkEnd::Server didn't do its job");
let pc = PrintConfigOpt(&opts.print_config);
if ics.is_empty() {
pc.implement(ics)?;
return Ok(None);
}
let global_config = config::InstanceConfigGlobal::from(&ics);
let gc = (&server_name, &global_config);
if pc.keys().all(|k| gc.inspect_key(k).is_some()) {
pc.implement([&gc])?;
} else {
pc.implement(ics)?;
}
Ok(Some(global_config))
}, |global_config, ics| {
let global_config = global_config.expect("some instances");
if let Some(pidfile_path) = opts.pidfile.as_ref() {
(||{
let mut pidfile = fs::File::create(pidfile_path).context("create")?;
writeln!(pidfile, "{}", process::id()).context("write")?;
pidfile.flush().context("write (flush)")?;
Ok::<_,AE>(())
})().with_context(|| format!("pidfile {:?}", pidfile_path))?;
}
let ipif = Ipif::start(&global_config.ipif, None)?;
let ics = ics.into_iter().map(Arc::new).collect_vec();
let (client_handles_send, client_handles_recv) = ics.iter()
.map(|_ic| {
let (web_send, web_recv) = mpsc::channel(
MAXQUEUE_WEBREQ2USER
);
let (route_send, route_recv) = mpsc::channel(
MAXQUEUE_ROUTE2USER
);
((web_send, route_send), (web_recv, route_recv))
}).unzip::<_,_,Vec<_>,Vec<_>>();
let all_clients = izip!(
&ics,
client_handles_send,
).map(|(ic, (web_send, route_send))| {
(ic.link.client,
User {
ic: ic.clone(),
web: web_send,
route: route_send,
})
}).collect();
let (local_rx_send, local_tx_recv) = mpsc::channel(
MAXQUEUE_ROUTE2LOCAL
);
let global = Arc::new(Global {
config: global_config,
local_rx: local_rx_send,
all_clients,
});
for (ic, (web_recv, route_recv)) in izip!(
ics,
client_handles_recv,
) {
let global_ = global.clone();
let ic_ = ic.clone();
tasks.push((tokio::spawn(async move {
suser::run(global_, ic_, web_recv, route_recv)
.await.void_unwrap_err()
}), format!("client {}", &ic)));
}
for addr in &global.config.addrs {
let global_ = global.clone();
let make_service = hyper::service::make_service_fn(
move |conn: &hyper::server::conn::AddrStream| {
let global_ = global_.clone();
let conn = Arc::new(format!("[{}]", conn.remote_addr()));
async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
AssertUnwindSafe(
sweb::handle(conn.clone(), global_.clone(), req)
)
.catch_unwind()
.map(|r| r.unwrap_or_else(|_|{
crash(Err("panicked".into()), "webserver request task")
}))
}) ) }
}
);
let addr = SocketAddr::new(*addr, global.config.port);
let server = hyper::Server::try_bind(&addr)
.context("bind")?
.http1_preserve_header_case(true)
.serve(make_service);
info!("listening on {}", &addr);
let task = tokio::task::spawn(async move {
match server.await {
Ok(()) => anyhow!("shut down?!"),
Err(e) => e.into(),
}
});
tasks.push((task, format!("http server {}", addr)));
}
#[allow(clippy::redundant_clone)] let global_ = global.clone();
let ipif = tokio::task::spawn(async move {
slocal::run(global_, local_tx_recv, ipif).await
.void_unwrap_err()
});
tasks.push((ipif, format!("ipif")));
Ok(())
});
if let Some(daemon) = daemon {
daemon.complete();
}
let (output, died_i, _) = future::select_all(
tasks.iter_mut().map(|e| &mut e.0)
).await;
let task = &tasks[died_i].1;
let output = output.map_err(|je| je.to_string());
crash(output, task);
}
pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
match what_happened {
Err(je) => error!("task crashed! {}: {}", task, &je),
Ok(e) => error!("task failed! {}: {}", task, &e ),
}
process::exit(12);
}