hippotat 1.1.7

Asinine HTTP-over-IP
Documentation
// Copyright 2021-2022 Ian Jackson and contributors to Hippotat
// SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
// There is NO WARRANTY.

use super::*;

#[derive(Debug)]
pub struct User {
  pub ic: Arc<InstanceConfig>,
  pub web: mpsc::Sender<WebRequest>,
  pub route: mpsc::Sender<RoutedPacket>,
}

pub async fn run(global: Arc<Global>,
                 ic: Arc<InstanceConfig>,
                 mut web: mpsc::Receiver<WebRequest>,
                 mut routed: mpsc::Receiver<RoutedPacket>)
                 -> Result<Void, AE>
{
  struct Outstanding {
    reply_to: oneshot::Sender<WebResponse>,
    oi: OutstandingInner,
  }
  #[derive(Debug)]
  struct OutstandingInner {
    deadline: Instant,
    target_requests_outstanding: u32,
    max_batch_down: u32,
  }
  let mut outstanding: VecDeque<Outstanding> = default();
  let mut downbound: PacketQueue<RoutedPacketData> = default();

  let try_send_response = |
    reply_to: oneshot::Sender<WebResponse>,
    response: WebResponse
  | {
    reply_to.send(response)
      .unwrap_or_else(|_: WebResponse| {
        /* oh dear */
        trace!("unable to send response back to webserver! user={}",
               &ic.link.client);
      });
  };

  loop {
    let eff_max_batch_down = outstanding
      .iter()
      .map(|o| o.oi.max_batch_down)
      .min()
      .unwrap_or(ic.max_batch_down)
      .sat();
    let earliest_deadline = outstanding
      .iter()
      .map(|o| o.oi.deadline)
      .min();


    if let Some(req) = {
      let now = Instant::now();

      if ! downbound.is_empty() {
        outstanding.pop_front()
      } else if let Some((i,_)) = outstanding.iter().enumerate().find({
        |(_,o)| {
          outstanding.len() > o.oi.target_requests_outstanding.sat()
            ||
          o.oi.deadline < now
        }
      }) {
        Some(outstanding.remove(i).unwrap())
      } else {
        None
      }
    } {
      let mut build: FrameQueueBuf = default();

      loop {
        let next = if let Some(n) = downbound.peek_front() { n }
                   else { break };
        // Don't add 1 for the ESC since we will strip one
        if build.len() + next.len() >= eff_max_batch_down { break }
        build.esc_push(downbound.pop_front().unwrap());
      }
      if ! build.is_empty() {
        // skip leading ESC
        build.advance(1);
      }

      let response = WebResponse {
        data: Ok(build),
        warnings: default(),
      };

      try_send_response(req.reply_to, response);
    }

    let max = usize::saturating_mul(
      ic.max_requests_outstanding.sat(),
      eff_max_batch_down,
    ).saturating_add(1 /* one boundary SLIP_ESC which we'll trim */);

    while downbound.total_len() > max {
      let _ = downbound.pop_front();
      trace!("{} discarding downbound-queue-full", &ic.link);
    }

    select!{
      biased;

      data = routed.recv() =>
      {
        let data = data.ok_or_else(|| anyhow!("routers shut down!"))?;
        downbound.push_back(data.data);
      },

      req = web.recv() =>
      {
        let WebRequest {
          initial, initial_remaining, length_hint, mut body,
          boundary_finder,
          reply_to, conn, mut warnings, may_route,
        } = req.ok_or_else(|| anyhow!("webservers all shut down!"))?;

        match async {

          let initial_used = initial.len() - initial_remaining;

          let whole_request = read_limited_bytes(
            ic.max_batch_up.sat(),
            initial,
            length_hint,
            &mut body
          ).await.context("read request body")?;

          let (meta, mut comps) =
            multipart::ComponentIterator::resume_mid_component(
              &whole_request[initial_used..],
              boundary_finder
            ).context("resume parsing body, after auth checks")?;

          let mut meta = MetadataFieldIterator::new(&meta);

          macro_rules! meta {
            { $v:ident, ( $( $badcmp:tt )? ), $ret:expr,
              let $server:ident, $client:ident $($code:tt)*
            } => {
              let $v = (||{
                let $server = ic.$v;
                let $client $($code)*
                $(
                  if $client $badcmp $server {
                    throw!(anyhow!("mismatch: client={:?} {} server={:?}",
                                   $client, stringify!($badcmp), $server));
                  }
                )?
                Ok::<_,AE>($ret)
              })().context(stringify!($v))?;
              //dbg!(&$v);
            }
          }
          meta!{
            target_requests_outstanding, ( != ), client,
            let server, client: u32 = meta.need_parse()?;
          }
          meta!{
            http_timeout, ( > ), client,
            let server, client = Duration::from_secs(meta.need_parse()?);
          }
          meta!{
            mtu, ( != ), client,
            let server, client: u32 = meta.parse()?.unwrap_or(server);
          }
          meta!{
            max_batch_down, (), min(client, server),
            let server, client: u32 = meta.parse()?.unwrap_or(server);
          }
          meta!{
            max_batch_up, ( > ), client,
            let server, client = meta.parse()?.unwrap_or(server);
          }
          let _ = max_batch_up; // we don't use this further

          while let Some(comp) = comps.next(&mut warnings, PartName::d)? {
            if comp.name != PartName::d {
              warnings.add(&format_args!("unexpected part {:?}", comp.name))?;
            }
            slip::processn(Mime2Slip, mtu, comp.payload, |header| {
              let saddr = ip_packet_addr::<false>(header)?;
              if saddr != ic.link.client.0 { throw!(PE::Src(saddr)) }
              let daddr = ip_packet_addr::<true>(header)?;
              Ok(daddr)
            }, |(daddr,packet)| route_packet(
              &global, &conn, Some(&ic.link.client), daddr,
              packet, may_route.clone(),
            ).map(Ok),
              |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
            ).await?;
          }

          let deadline = Instant::now() + http_timeout;

          let oi = OutstandingInner {
            target_requests_outstanding,
            max_batch_down,
            deadline,
          };
          Ok::<_,AE>(oi)
        }.await {
          Ok(oi) => outstanding.push_back(Outstanding { reply_to, oi }),
          Err(e) => {
            try_send_response(reply_to, WebResponse {
              data: Err(e),
              warnings,
            });
          },
        }
      }

      () = async {if let Some(deadline) = earliest_deadline {
        tokio::time::sleep_until(deadline).await;
      } else {
        future::pending().await
      } } =>
      {
      }
    }
  }
}