#![allow(clippy::style)]
#![allow(clippy::unit_arg)]
#![allow(clippy::useless_format)]
#![allow(clippy::while_let_loop)]
use hippotat::prelude::*;
mod daemon;
mod suser;
mod slocal;
mod sweb;
pub use daemon::Daemoniser;
pub use sweb::{WebRequest, WebResponse, WebResponseBody};
pub use suser::User;
#[derive(clap::Parser,Debug)]
pub struct Opts {
#[clap(flatten)]
pub log: LogOpts,
#[clap(flatten)]
pub config: config::CommonOpts,
#[clap(long)]
daemon: bool,
#[clap(long)]
pidfile: Option<String>,
#[clap(long)]
print_config: Option<String>,
}
pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
pub const MAXQUEUE_ROUTE2USER: usize = 15;
pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
pub const MAXQUEUE_WEBREQ2USER: usize = 5;
#[derive(Debug)]
pub struct Global {
config: config::InstanceConfigGlobal,
local_rx: mpsc::Sender<RoutedPacket>,
all_clients: HashMap<ClientName, User>,
}
pub struct RoutedPacket {
pub data: RoutedPacketData,
}
pub type RoutedPacketData = Box<[u8]>;
mod may_route {
#[derive(Clone,Debug)]
pub struct MayRoute(());
impl MayRoute {
pub fn came_from_outside_hippotatd() -> Self { Self(()) }
}
}
pub use may_route::MayRoute;
pub async fn route_packet(global: &Global,
transport_conn: &str, source: Option<&ClientName>,
packet: RoutedPacketData, daddr: IpAddr,
_may_route: MayRoute)
{
let c = &global.config;
let len = packet.len();
let trace = |how: &str, why: &str| {
trace!("{} to={:?} came=={} user={} len={} {}",
how, daddr, transport_conn,
match source {
Some(s) => s as &dyn Display,
None => &"local",
},
len, why);
};
let (dest, why) =
if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
(Some(&global.local_rx), "via=local")
} else if daddr == c.vrelay {
(None, " vrelay")
} else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
(Some(&client.route), "via=client")
} else {
(None, "no-client")
};
let dest = if let Some(d) = dest { d } else {
trace("discard ", why); return;
};
let packet = RoutedPacket {
data: packet,
};
match dest.send(packet).await {
Ok(()) => trace("forward", why),
Err(_) => trace("task-crashed!", why),
}
}
fn main() {
let opts = <Opts as clap::Parser>::parse();
let daemon = if opts.daemon && opts.print_config.is_none() {
Some(Daemoniser::phase1())
} else {
None
};
async_main(opts, daemon);
}
#[tokio::main]
async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
let mut tasks: Vec<(
JoinHandle<AE>,
String,
)> = vec![];
config::startup(
"hippotatd", LinkEnd::Server,
&opts.config, &opts.log, |server_name, ics|
{
let server_name = server_name.expect("LinkEnd::Server didn't do its job");
let pc = PrintConfigOpt(&opts.print_config);
if ics.is_empty() {
pc.implement(ics)?;
return Ok(None);
}
let global_config = config::InstanceConfigGlobal::from(ics);
let gc = (&server_name, &global_config);
if pc.keys().all(|k| gc.inspect_key(k).is_some()) {
pc.implement([&gc])?;
} else {
pc.implement(ics)?;
}
Ok(Some(global_config))
}, |global_config, ics| async {
let global_config = global_config.expect("some instances");
if let Some(pidfile_path) = opts.pidfile.as_ref() {
(||{
let mut pidfile = fs::File::create(pidfile_path).context("create")?;
writeln!(pidfile, "{}", process::id()).context("write")?;
pidfile.flush().context("write (flush)")?;
Ok::<_,AE>(())
})().with_context(|| format!("pidfile {:?}", pidfile_path))?;
}
let ipif = Ipif::start(&global_config.ipif, None)?;
let ics = ics.into_iter().map(Arc::new).collect_vec();
let (client_handles_send, client_handles_recv) = ics.iter()
.map(|_ic| {
let (web_send, web_recv) = mpsc::channel(
MAXQUEUE_WEBREQ2USER
);
let (route_send, route_recv) = mpsc::channel(
MAXQUEUE_ROUTE2USER
);
((web_send, route_send), (web_recv, route_recv))
}).unzip::<_,_,Vec<_>,Vec<_>>();
let all_clients = izip!(
&ics,
client_handles_send,
).map(|(ic, (web_send, route_send))| {
(ic.link.client,
User {
ic: ic.clone(),
web: web_send,
route: route_send,
})
}).collect();
let (local_rx_send, local_tx_recv) = mpsc::channel(
MAXQUEUE_ROUTE2LOCAL
);
let global = Arc::new(Global {
config: global_config,
local_rx: local_rx_send,
all_clients,
});
let max_buffer = chain!(
[16384], ics.iter().map(|ic| {
[ic.max_batch_up,
ic.max_batch_down]
}).flatten(),
).max().expect("not empty since we have at least one [item]")
.try_into().unwrap_or_else(|_: TryFromIntError| usize::MAX);
for (ic, (web_recv, route_recv)) in izip!(
ics,
client_handles_recv,
) {
let global_ = global.clone();
let ic_ = ic.clone();
tasks.push((tokio::spawn(async move {
suser::run(global_, ic_, web_recv, route_recv)
.await.void_unwrap_err()
}), format!("client {}", &ic)));
}
let listeners = {
let mut listeners = vec![];
for saddr in &global.config.addrs {
let saddr = SocketAddr::new(*saddr, global.config.port);
let listener = tokio::net::TcpListener::bind(saddr)
.await
.with_context(|| format!("bind {}", saddr))?;
listeners.push((saddr, listener));
}
listeners
};
for (saddr, listener) in listeners {
info!("listening on {}", &saddr);
let global = global.clone();
let task = tokio::task::spawn(async move {
loop {
let (conn, caddr) = match listener.accept().await {
Ok(y) => y,
Err(e) => { debug!("{saddr}: listen error: {e:#}"); continue; }
};
let caddr = Arc::new(format!("[{caddr}]"));
let service = hyper::service::service_fn({
let global = global.clone();
let caddr = caddr.clone();
move |req| {
let global = global.clone();
let caddr = caddr.clone();
async move {
AssertUnwindSafe(
sweb::handle(caddr, global, req)
)
.catch_unwind().await
.unwrap_or_else(|_| {
crash(Err("panicked".into()), "webserver request task")
})
}
}
});
let conn = hyper_util::rt::tokio::TokioIo::new(conn);
let conn_fut = hyper::server::conn::http1::Builder::new()
.half_close(true)
.title_case_headers(true)
.max_buf_size(max_buffer)
.serve_connection(conn, service);
tokio::task::spawn(async move {
match conn_fut.await {
Ok(()) => {},
Err(e) => trace!("{}: client connection from {} failed: {:#}",
saddr, caddr, e),
};
});
}
});
tasks.push((task, format!("http server {}", saddr)));
}
#[allow(clippy::redundant_clone)] let global_ = global.clone();
let ipif = tokio::task::spawn(async move {
slocal::run(global_, local_tx_recv, ipif).await
.void_unwrap_err()
});
tasks.push((ipif, format!("ipif")));
Ok(())
}).await;
if let Some(daemon) = daemon {
daemon.complete();
}
let (output, died_i, _) = future::select_all(
tasks.iter_mut().map(|e| &mut e.0)
).await;
let task = &tasks[died_i].1;
let output = output.map_err(|je| je.to_string());
crash(output, task);
}
pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
match what_happened {
Err(je) => error!("task crashed! {}: {}", task, &je),
Ok(e) => error!("task failed! {}: {}", task, &e ),
}
process::exit(12);
}
#[test]
fn verify_cli() {
hippotat::utils::verify_cli::<Opts>();
}