merfolk_backend_http 0.1.0

A HTTP `Backend` for merfolk.
Documentation
use merfolk::{interfaces::Backend, Call, Reply};

use anyhow::Result;
use thiserror::Error;

use hyper::{
  client::{connect::dns::GaiResolver, HttpConnector},
  Body, Client, Method, Request, StatusCode,
};

use hyper::{
  http::Uri,
  service::{make_service_fn, service_fn},
  Response, Server,
};
use std::{fmt::Debug, net::SocketAddr, sync::Arc};
use tokio::{runtime::Runtime, sync};

use log::{debug, info, trace};

#[derive(Debug, Error)]
pub enum Error {
  #[error("serializing failed: {0}")]
  Serialize(#[source] serde_json::Error),
  #[error("deserializing failed {0}")]
  Deserialize(#[source] serde_json::Error),
  #[error("no speak provided in init()")]
  NoSpeak,
  #[error("error building request: {0}")]
  RequestBuilder(#[source] hyper::http::Error),
  #[error("error parsing body bytes: {0}")]
  ParseResponseBodyBytes(#[source] hyper::Error),
  #[error("error parsing body: {0}")]
  ParseResponseBody(#[from] std::string::FromUtf8Error),
  #[error("error while sending request: {0}")]
  ClientRequest(#[source] hyper::Error),
  #[error("request failed with statuscode {status}")]
  FailedRequest { status: StatusCode },
  #[error("no listen provided in init()")]
  NoListen,
  #[error("no receiver was degistered by init()")]
  NoReceiver,
  #[error("error binding server: {0}")]
  BindServer(#[from] hyper::Error),
  #[error("no procedure header in request: {0}")]
  NoProcedureHeader(#[source] hyper::http::Error),
  #[error("could not create runtime: {0}")]
  RuntimeCreation(#[from] std::io::Error),
  #[error("already started")]
  AlreadyStarted,
  #[error("not started")]
  NotStarted,
  #[error("could not send stoping message")]
  Shutdown,
}

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned")]
pub struct Http {
  #[builder(setter(into, strip_option, name = "speak_setter"), private, default = "None")]
  speak: Option<(Uri, Client<HttpConnector<GaiResolver>, Body>)>,

  #[builder(setter(into, strip_option), default = "None")]
  listen: Option<SocketAddr>,

  #[allow(clippy::type_complexity)]
  #[builder(private, default = "None")]
  receiver: Option<Arc<dyn Fn(Call<String>) -> Result<Reply<String>> + Send + Sync>>,

  #[builder(private, default = "Runtime::new().map_err(Error::RuntimeCreation).map_err(|e| e.to_string())?")]
  runtime: Runtime,

  #[builder(private, default = "None")]
  shutdown: Option<sync::oneshot::Sender<()>>,
}

impl HttpBuilder {
  pub fn speak(self, value: Uri) -> Self {
    self.speak_setter((value, Client::new()))
  }
}

impl Http {
  pub fn builder() -> HttpBuilder {
    HttpBuilder::default()
  }
}

impl Debug for Http {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
    f.debug_struct("Http")
      .field("speak", &self.speak)
      .field("listen", &self.listen)
      .field("runtime", &self.runtime)
      .finish()
  }
}

impl Http {
  pub fn start(&mut self) -> Result<()> {
    trace!("start Http Backend");

    if self.shutdown.is_some() {
      return Err(Error::AlreadyStarted.into());
    }

    let (tx, rx) = tokio::sync::oneshot::channel::<()>();

    self.shutdown = Some(tx);

    let listen = self.listen.ok_or(Error::NoListen)?;

    let receiver = Arc::clone(self.receiver.as_ref().ok_or(Error::NoReceiver)?);

    self.runtime.spawn(async move {
      trace!("spawn listener");

      let receiver = receiver.clone();

      Server::bind(&listen)
        .serve(make_service_fn(move |_| {
          trace!("serve connection");

          let receiver = receiver.clone();
          async move {
            Ok::<_, hyper::Error>(service_fn(move |request: Request<Body>| {
              trace!("run service_fn");

              debug!("{:?}", &listen);

              info!("received incomming call");

              let receiver = receiver.clone();
              async move {
                let procedure = if let Some(procedure) = request.headers().get("Procedure") {
                  match procedure.to_str() {
                    Ok(procedure) => procedure.to_owned(),
                    Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
                  }
                } else {
                  return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from("No Procedure provided"));
                };

                let body_bytes = match hyper::body::to_bytes(request.into_body()).await {
                  Ok(body_bytes) => body_bytes,
                  Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
                };

                let body = match String::from_utf8(body_bytes.to_vec()) {
                  Ok(body) => body,
                  Err(e) => return Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(e.to_string())),
                };

                debug!("call Call {{ procedure: {:?}, payload: {:?} }}", &procedure, &body);
                let reply = receiver(crate::Call { procedure, payload: body });

                match reply {
                  Err(e) => Response::builder().status(StatusCode::BAD_REQUEST).body(Body::from(format!("{:?}", e))),

                  Ok(reply) => {
                    debug!("reply Reply {{ payload: {:?} }}", &reply.payload);
                    Response::builder().status(StatusCode::OK).body(Body::from(reply.payload))
                  }
                }
              }
            }))
          }
        }))
        .with_graceful_shutdown(async {
          rx.await.ok();
        })
        .await
        .unwrap();
    });
    Ok(())
  }

  pub fn stop(&mut self) -> Result<()> {
    trace!("stop http backend");
    self.shutdown.take().ok_or(Error::NotStarted)?.send(()).map_err(|_| Error::Shutdown.into())
  }
}

impl Backend for Http {
  type Intermediate = String;

  fn register<T>(&mut self, receiver: T) -> Result<()>
  where
    T: Fn(Call<Self::Intermediate>) -> Result<Reply<Self::Intermediate>> + Send + Sync + 'static,
  {
    trace!("register receiver");

    self.receiver = Some(Arc::new(move |call: Call<String>| {
      trace!("run receiver");

      debug!("calling receiver");
      receiver(call)
    }));

    if let Some(err) = self.start().err().map(|e| e.downcast::<Error>()) {
      match err {
        Ok(err) => match err {
          Error::AlreadyStarted | Error::NoListen | Error::NoReceiver => {}
          err => return Err(err.into()),
        },
        Err(err) => return Err(err),
      }
    };

    Ok(())
  }

  fn call(&mut self, call: Call<Self::Intermediate>) -> Result<Reply<Self::Intermediate>> {
    trace!("call backend");

    debug!("{:?}", &self.speak);

    info!("received outgoing call");

    match &self.speak {
      None => Err(Error::NoSpeak.into()),

      Some(speak) => self.runtime.block_on(async {
        let request = Request::builder()
          .method(Method::POST)
          .uri(&speak.0)
          .header("Procedure", &call.procedure)
          .body(Body::from(call.payload.clone()))
          .map_err(Error::RequestBuilder)?;

        debug!("request {:?}", &request);
        let response = speak.1.request(request).await.map_err(Error::ClientRequest)?;
        debug!("response {:?}", &response);

        let status = response.status();
        let body_bytes = hyper::body::to_bytes(response.into_body()).await.map_err(Error::ParseResponseBodyBytes)?;
        let body = String::from_utf8(body_bytes.to_vec()).map_err(Error::ParseResponseBody)?;

        match status {
          StatusCode::OK => Ok(Reply { payload: body }),
          _ => Err(Error::FailedRequest { status }.into()),
        }
      }),
    }
  }

  fn serialize<T: serde::Serialize>(from: &T) -> Result<String> {
    trace!("serialize from");

    serde_json::to_string(from).map_err(|e| Error::Serialize(e).into())
  }

  fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T>
  where
    T: for<'de> serde::Deserialize<'de>,
  {
    trace!("deserialize from");

    serde_json::from_str(&from).map_err(|e| Error::Deserialize(e).into())
  }
}

impl Drop for Http {
  fn drop(&mut self) {
    if self.shutdown.is_some() {
      self.stop().unwrap()
    }
  }
}