hippotat 1.2.2

Asinine HTTP-over-IP
Documentation
// Copyright 2021-2022 Ian Jackson and contributors to Hippotat
// SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
// There is NO WARRANTY.

#![allow(clippy::style)]

#![allow(clippy::unit_arg)]
#![allow(clippy::useless_format)]
#![allow(clippy::while_let_loop)]

use hippotat::prelude::*;

mod daemon;
mod suser;
mod slocal;
mod sweb;

pub use daemon::Daemoniser;
pub use sweb::{WebRequest, WebResponse, WebResponseBody};
pub use suser::User;

#[derive(clap::Parser,Debug)]
pub struct Opts {
  #[clap(flatten)]
  pub log: LogOpts,

  #[clap(flatten)]
  pub config: config::CommonOpts,

  /// Daemonise
  #[clap(long)]
  daemon: bool,

  /// Write our pid to this file
  #[clap(long)]
  pidfile: Option<String>,

  /// Print config item(s), do not actually run
  ///
  /// Argument is (comma-separated) list of config keys;
  /// values will be printed tab-separated.
  /// The key `pretty` dumps the whole config in a pretty debug format.
  ///
  /// If none of the specified config keys are client-specific,
  /// only one line will be printed.  Otherwise the output will
  /// have one line per client association.
  ///
  /// Additional pseudo-config-keys are recognised:
  /// `client`: our client virtual IP address;
  /// `server`: server's logical name in the config;
  /// `link`: the link name including the `[ ]`.
  #[clap(long)]
  print_config: Option<String>,
}

pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;


// ----- Backpressure discussion -----

// These two kinds of channels are sent blockingly, so this means the
// task which calls route_packet can get this far ahead, before a
// context switch to the receiving task is forced.
pub const MAXQUEUE_ROUTE2USER: usize = 15;
pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;

// This channel is sent with try_send, ie non-blocking.  If the user
// task becomes overloaded, requests will start to be rejected.
pub const MAXQUEUE_WEBREQ2USER: usize = 5;

// The user task prioritises 1. returning requests or discarding data,
// 2. handling data routed to it.  Ie it prefers to drain queues.
//
// The slocal task prioritises handling routed data and writing it
// (synchronously) to the local kernel.  So if the local kernel starts
// blocking, all tasks may end up blocked waiting for things to drain.


#[derive(Debug)]
pub struct Global {
  config: config::InstanceConfigGlobal,
  local_rx: mpsc::Sender<RoutedPacket>,
  all_clients: HashMap<ClientName, User>,
}

pub struct RoutedPacket {
  pub data: RoutedPacketData,
//  pub source: Option<ClientName>, // for eh, tracing, etc.
}

// not MIME data, valid SLIP (checked)
pub type RoutedPacketData = Box<[u8]>;

// loop prevention
// we don't decrement the ttl (naughty) but loops cannot arise
// because only the server has any routing code, and server
// has no internal loops, so worst case is
//  client if -> client -> server -> client' -> client if'
// and the ifs will decrement the ttl.
mod may_route {
  #[derive(Clone,Debug)]
  pub struct MayRoute(());
  impl MayRoute {
    pub fn came_from_outside_hippotatd() -> Self { Self(()) }
  }
}
pub use may_route::MayRoute;

pub async fn route_packet(global: &Global,
                          transport_conn: &str, source: Option<&ClientName>,
                          packet: RoutedPacketData, daddr: IpAddr,
                          _may_route: MayRoute)
{
  let c = &global.config;
  let len = packet.len();
  let trace = |how: &str, why: &str| {
    trace!("{} to={:?} came=={} user={} len={} {}",
           how, daddr, transport_conn,
           match source {
             Some(s) => s as &dyn Display,
             None => &"local",
           },
           len, why);
  };

  let (dest, why) =
    if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
      (Some(&global.local_rx), "via=local")
    } else if daddr == c.vrelay {
      (None, " vrelay")
    } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
      (Some(&client.route), "via=client")
    } else {
      (None, "no-client")
    };

  let dest = if let Some(d) = dest { d } else {
    trace("discard ", why); return;
  };

  let packet = RoutedPacket {
    data: packet,
//    source: source.cloned(),
  };
  match dest.send(packet).await {
    Ok(()) => trace("forward", why),
    Err(_) => trace("task-crashed!", why),
  }
}

fn main() {
  let opts = <Opts as clap::Parser>::parse();

  let daemon = if opts.daemon && opts.print_config.is_none() {
    Some(Daemoniser::phase1())
  } else {
    None
  };

  async_main(opts, daemon);
}

#[tokio::main]
async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
  let mut tasks: Vec<(
    JoinHandle<AE>,
    String,
  )> = vec![];

  config::startup(
    "hippotatd", LinkEnd::Server,
    &opts.config, &opts.log, |server_name, ics|
  {
    let server_name = server_name.expect("LinkEnd::Server didn't do its job");
    let pc = PrintConfigOpt(&opts.print_config);

    if ics.is_empty() {
      pc.implement(ics)?;
      return Ok(None);
    }

    let global_config = config::InstanceConfigGlobal::from(ics);
    let gc = (&server_name, &global_config);

    if pc.keys().all(|k| gc.inspect_key(k).is_some()) {
      pc.implement([&gc])?;
    } else {
      pc.implement(ics)?;
    }

    Ok(Some(global_config))

  }, |global_config, ics| async {

    let global_config = global_config.expect("some instances");

    if let Some(pidfile_path) = opts.pidfile.as_ref() {
      (||{
        let mut pidfile = fs::File::create(pidfile_path).context("create")?;
        writeln!(pidfile, "{}", process::id()).context("write")?;
        pidfile.flush().context("write (flush)")?;
        Ok::<_,AE>(())
      })().with_context(|| format!("pidfile {:?}", pidfile_path))?;
    }

    let ipif = Ipif::start(&global_config.ipif, None)?;

    let ics = ics.into_iter().map(Arc::new).collect_vec();
    let (client_handles_send, client_handles_recv) = ics.iter()
      .map(|_ic| {
        let (web_send, web_recv) = mpsc::channel(
          MAXQUEUE_WEBREQ2USER
        );
        let (route_send, route_recv) = mpsc::channel(
          MAXQUEUE_ROUTE2USER
        );
        ((web_send, route_send), (web_recv, route_recv))
      }).unzip::<_,_,Vec<_>,Vec<_>>();

    let all_clients = izip!(
      &ics,
      client_handles_send,
    ).map(|(ic, (web_send, route_send))| {
      (ic.link.client,
       User {
         ic: ic.clone(),
         web: web_send,
         route: route_send,
       })
    }).collect();

    let (local_rx_send, local_tx_recv) = mpsc::channel(
      MAXQUEUE_ROUTE2LOCAL
    );

    let global = Arc::new(Global {
      config: global_config,
      local_rx: local_rx_send,
      all_clients,
    });

    let max_buffer = chain!(
      [16384], // hyper demands at least 8192
      ics.iter().map(|ic| {
        [ic.max_batch_up,
         ic.max_batch_down]
      }).flatten(),
    ).max().expect("not empty since we have at least one [item]")
      .try_into().unwrap_or_else(|_: TryFromIntError| usize::MAX);

    for (ic, (web_recv, route_recv)) in izip!(
      ics,
      client_handles_recv,
    ) {
      let global_ = global.clone();
      let ic_ = ic.clone();
      tasks.push((tokio::spawn(async move {
        suser::run(global_, ic_, web_recv, route_recv)
          .await.void_unwrap_err()
      }), format!("client {}", &ic)));
    }

    let listeners = {
      let mut listeners = vec![];
      for saddr in &global.config.addrs {
        let saddr = SocketAddr::new(*saddr, global.config.port);
        let listener = tokio::net::TcpListener::bind(saddr)
          .await
          .with_context(|| format!("bind {}", saddr))?;
        listeners.push((saddr, listener));
      }
      listeners
    };

    for (saddr, listener) in listeners {
      info!("listening on {}", &saddr);
      let global = global.clone();

      let task = tokio::task::spawn(async move {
        loop {
          let (conn, caddr) = match listener.accept().await {
            Ok(y) => y,
            Err(e) => { debug!("{saddr}: listen error: {e:#}"); continue; }
          };
          let caddr = Arc::new(format!("[{caddr}]"));

          let service = hyper::service::service_fn({
            let global = global.clone();
            let caddr = caddr.clone();
            move |req| {
              let global = global.clone();
              let caddr = caddr.clone();
              async move {
                AssertUnwindSafe(
                  sweb::handle(caddr, global, req)
                )
                  .catch_unwind().await
                  .unwrap_or_else(|_| {
                    crash(Err("panicked".into()), "webserver request task")
                  })
              }
            }
          });

          let conn = hyper_util::rt::tokio::TokioIo::new(conn);

          let conn_fut = hyper::server::conn::http1::Builder::new()
            .half_close(true)
            .title_case_headers(true)
            .max_buf_size(max_buffer)
            .serve_connection(conn, service);

          tokio::task::spawn(async move {
            match conn_fut.await {
              Ok(()) => {},
              Err(e) => trace!("{}: client connection from {} failed: {:#}",
                               saddr, caddr, e),
            };
          });
        }
      });
      tasks.push((task, format!("http server {}", saddr)));
    }

    #[allow(clippy::redundant_clone)] let global_ = global.clone();
    let ipif = tokio::task::spawn(async move {
      slocal::run(global_, local_tx_recv, ipif).await
        .void_unwrap_err()
    });
    tasks.push((ipif, format!("ipif")));

    Ok(())
  }).await;

  if let Some(daemon) = daemon {
    daemon.complete();
  }

  let (output, died_i, _) = future::select_all(
    tasks.iter_mut().map(|e| &mut e.0)
  ).await;

  let task = &tasks[died_i].1;
  let output = output.map_err(|je| je.to_string());
  crash(output, task);
}

pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
  match what_happened {
    Err(je) => error!("task crashed! {}: {}", task, &je),
    Ok(e)   => error!("task failed! {}: {}",   task, &e ),
  }
  process::exit(12);
}

#[test]
fn verify_cli() {
  hippotat::utils::verify_cli::<Opts>();
}