hippotat 1.1.7

Asinine HTTP-over-IP
Documentation
// Copyright 2021-2022 Ian Jackson and contributors to Hippotat
// SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
// There is NO WARRANTY.

#![allow(clippy::style)]

#![allow(clippy::unit_arg)]
#![allow(clippy::useless_format)]
#![allow(clippy::while_let_loop)]

use hippotat::prelude::*;

mod daemon;
mod suser;
mod slocal;
mod sweb;

pub use daemon::Daemoniser;
pub use sweb::{WebRequest, WebResponse, WebResponseBody};
pub use suser::User;

#[derive(clap::Parser,Debug)]
pub struct Opts {
  #[clap(flatten)]
  pub log: LogOpts,

  #[clap(flatten)]
  pub config: config::Opts,

  /// Daemonise
  #[clap(long)]
  daemon: bool,

  /// Write our pid to this file
  #[clap(long)]
  pidfile: Option<String>,

  /// Print config item(s), do not actually run
  ///
  /// Argument is (comma-separated) list of config keys;
  /// values will be printed tab-separated.
  /// The key `pretty` dumps the whole config in a pretty debug format.
  ///
  /// If none of the specified config keys are client-specific,
  /// only one line will be printed.  Otherwise the output will
  /// have one line per client association.
  ///
  /// Additional pseudo-config-keys are recognised:
  /// `client`: our client virtual IP address;
  /// `server`: server's logical name in the config;
  /// `link`: the link name including the `[ ]`.
  #[clap(long)]
  print_config: Option<String>,
}

pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;


// ----- Backpressure discussion -----

// These two kinds of channels are sent blockingly, so this means the
// task which calls route_packet can get this far ahead, before a
// context switch to the receiving task is forced.
pub const MAXQUEUE_ROUTE2USER: usize = 15;
pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;

// This channel is sent with try_send, ie non-blocking.  If the user
// task becomes overloaded, requests will start to be rejected.
pub const MAXQUEUE_WEBREQ2USER: usize = 5;

// The user task prioritises 1. returning requests or discarding data,
// 2. handling data routed to it.  Ie it prefers to drain queues.
//
// The slocal task prioritises handling routed data and writing it
// (synchronously) to the local kernel.  So if the local kernel starts
// blocking, all tasks may end up blocked waiting for things to drain.


#[derive(Debug)]
pub struct Global {
  config: config::InstanceConfigGlobal,
  local_rx: mpsc::Sender<RoutedPacket>,
  all_clients: HashMap<ClientName, User>,
}

pub struct RoutedPacket {
  pub data: RoutedPacketData,
//  pub source: Option<ClientName>, // for eh, tracing, etc.
}

// not MIME data, valid SLIP (checked)
pub type RoutedPacketData = Box<[u8]>;

// loop prevention
// we don't decrement the ttl (naughty) but loops cannot arise
// because only the server has any routing code, and server
// has no internal loops, so worst case is
//  client if -> client -> server -> client' -> client if'
// and the ifs will decrement the ttl.
mod may_route {
  #[derive(Clone,Debug)]
  pub struct MayRoute(());
  impl MayRoute {
    pub fn came_from_outside_hippotatd() -> Self { Self(()) }
  }
}
pub use may_route::MayRoute;

pub async fn route_packet(global: &Global,
                          transport_conn: &str, source: Option<&ClientName>,
                          packet: RoutedPacketData, daddr: IpAddr,
                          _may_route: MayRoute)
{
  let c = &global.config;
  let len = packet.len();
  let trace = |how: &str, why: &str| {
    trace!("{} to={:?} came=={} user={} len={} {}",
           how, daddr, transport_conn,
           match source {
             Some(s) => s as &dyn Display,
             None => &"local",
           },
           len, why);
  };

  let (dest, why) =
    if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
      (Some(&global.local_rx), "via=local")
    } else if daddr == c.vrelay {
      (None, " vrelay")
    } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
      (Some(&client.route), "via=client")
    } else {
      (None, "no-client")
    };

  let dest = if let Some(d) = dest { d } else {
    trace("discard ", why); return;
  };

  let packet = RoutedPacket {
    data: packet,
//    source: source.cloned(),
  };
  match dest.send(packet).await {
    Ok(()) => trace("forward", why),
    Err(_) => trace("task-crashed!", why),
  }
}

fn main() {
  let opts = <Opts as clap::Parser>::parse();

  let daemon = if opts.daemon && opts.print_config.is_none() {
    Some(Daemoniser::phase1())
  } else {
    None
  };

  async_main(opts, daemon);
}

#[tokio::main]
async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
  let mut tasks: Vec<(
    JoinHandle<AE>,
    String,
  )> = vec![];

  config::startup(
    "hippotatd", LinkEnd::Server,
    &opts.config, &opts.log, |server_name, ics|
  {
    let server_name = server_name.expect("LinkEnd::Server didn't do its job");
    let pc = PrintConfigOpt(&opts.print_config);

    if ics.is_empty() {
      pc.implement(ics)?;
      return Ok(None);
    }

    let global_config = config::InstanceConfigGlobal::from(&ics);
    let gc = (&server_name, &global_config);

    if pc.keys().all(|k| gc.inspect_key(k).is_some()) {
      pc.implement([&gc])?;
    } else {
      pc.implement(ics)?;
    }

    Ok(Some(global_config))

  }, |global_config, ics| {

    let global_config = global_config.expect("some instances");

    if let Some(pidfile_path) = opts.pidfile.as_ref() {
      (||{
        let mut pidfile = fs::File::create(pidfile_path).context("create")?;
        writeln!(pidfile, "{}", process::id()).context("write")?;
        pidfile.flush().context("write (flush)")?;
        Ok::<_,AE>(())
      })().with_context(|| format!("pidfile {:?}", pidfile_path))?;
    }

    let ipif = Ipif::start(&global_config.ipif, None)?;

    let ics = ics.into_iter().map(Arc::new).collect_vec();
    let (client_handles_send, client_handles_recv) = ics.iter()
      .map(|_ic| {
        let (web_send, web_recv) = mpsc::channel(
          MAXQUEUE_WEBREQ2USER
        );
        let (route_send, route_recv) = mpsc::channel(
          MAXQUEUE_ROUTE2USER
        );
        ((web_send, route_send), (web_recv, route_recv))
      }).unzip::<_,_,Vec<_>,Vec<_>>();

    let all_clients = izip!(
      &ics,
      client_handles_send,
    ).map(|(ic, (web_send, route_send))| {
      (ic.link.client,
       User {
         ic: ic.clone(),
         web: web_send,
         route: route_send,
       })
    }).collect();

    let (local_rx_send, local_tx_recv) = mpsc::channel(
      MAXQUEUE_ROUTE2LOCAL
    );

    let global = Arc::new(Global {
      config: global_config,
      local_rx: local_rx_send,
      all_clients,
    });

    for (ic, (web_recv, route_recv)) in izip!(
      ics,
      client_handles_recv,
    ) {
      let global_ = global.clone();
      let ic_ = ic.clone();
      tasks.push((tokio::spawn(async move {
        suser::run(global_, ic_, web_recv, route_recv)
          .await.void_unwrap_err()
      }), format!("client {}", &ic)));
    }

    for addr in &global.config.addrs {
      let global_ = global.clone();
      let make_service = hyper::service::make_service_fn(
        move |conn: &hyper::server::conn::AddrStream| {
          let global_ = global_.clone();
          let conn = Arc::new(format!("[{}]", conn.remote_addr()));
          async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
            AssertUnwindSafe(
              sweb::handle(conn.clone(), global_.clone(), req)
            )
              .catch_unwind()
              .map(|r| r.unwrap_or_else(|_|{
                crash(Err("panicked".into()), "webserver request task")
              }))
          }) ) }
        }
      );

      let addr = SocketAddr::new(*addr, global.config.port);
      let server = hyper::Server::try_bind(&addr)
        .context("bind")?
        .http1_preserve_header_case(true)
        .serve(make_service);
      info!("listening on {}", &addr);
      let task = tokio::task::spawn(async move {
        match server.await {
          Ok(()) => anyhow!("shut down?!"),
          Err(e) => e.into(),
        }
      });
      tasks.push((task, format!("http server {}", addr)));
    }

    #[allow(clippy::redundant_clone)] let global_ = global.clone();
    let ipif = tokio::task::spawn(async move {
      slocal::run(global_, local_tx_recv, ipif).await
        .void_unwrap_err()
    });
    tasks.push((ipif, format!("ipif")));

    Ok(())
  });

  if let Some(daemon) = daemon {
    daemon.complete();
  }

  let (output, died_i, _) = future::select_all(
    tasks.iter_mut().map(|e| &mut e.0)
  ).await;

  let task = &tasks[died_i].1;
  let output = output.map_err(|je| je.to_string());
  crash(output, task);
}

pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
  match what_happened {
    Err(je) => error!("task crashed! {}: {}", task, &je),
    Ok(e)   => error!("task failed! {}: {}",   task, &e ),
  }
  process::exit(12);
}