1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use merfolk::{
  interfaces::{Backend, Frontend},
  Call, Reply,
};

use std::{
  collections::HashMap,
  sync::{Arc, Mutex},
};

use anyhow::Result;
use thiserror::Error;

use log::trace;

#[derive(Debug, Error)]
pub enum Error {
  #[error("backend error: {0}")]
  FromBackend(#[from] anyhow::Error),
  #[error("called procedure is not registered: {0}")]
  ProcedureNotRegistered(String),
  #[error("procedures lock was poinsoned")]
  Lock,
  #[error("call not registered merfolk init()")]
  CallNotRegistered,
}

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned")]
pub struct Register<'a, B: Backend> {
  #[allow(clippy::type_complexity)]
  #[builder(setter(name = "procedures_setter"), private, default = "Arc::new(Mutex::new(HashMap::new()))")]
  procedures: Arc<Mutex<HashMap<String, Box<dyn Fn(Call<B::Intermediate>) -> Result<Reply<B::Intermediate>> + 'a>>>>,

  #[allow(clippy::type_complexity)]
  #[builder(private, default = "None")]
  call: Option<Box<dyn Fn(Call<B::Intermediate>) -> Result<Reply<B::Intermediate>> + 'a + Send>>,
}

impl<'a, B: Backend> RegisterBuilder<'a, B> {
  #[allow(clippy::type_complexity)]
  pub fn procedures(self, value: HashMap<&'a str, Box<dyn Fn(Call<B::Intermediate>) -> Result<Reply<B::Intermediate>> + 'a>>) -> Self {
    self.procedures_setter(Arc::new(Mutex::new(value.into_iter().fold(HashMap::new(), |mut acc, p| {
      acc.insert(p.0.to_string(), p.1);
      acc
    }))))
  }
}

impl<'a, B: Backend> Register<'a, B> {
  pub fn builder() -> RegisterBuilder<'a, B> {
    RegisterBuilder::default()
  }
}

unsafe impl<'a, T: Backend> Send for Register<'a, T> {}

impl<'a, B: Backend> Register<'a, B> {
  #[allow(clippy::type_complexity)]
  pub fn make_procedure<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(procedure: P) -> Box<dyn Fn(Call<B::Intermediate>) -> Result<Reply<B::Intermediate>> + 'a>
  where
    P: Fn(C) -> R + 'a,
  {
    Box::new(move |call: Call<B::Intermediate>| {
      let reply = procedure(B::deserialize::<C>(&call.payload)?);
      Ok(Reply { payload: B::serialize::<R>(&reply)? })
    })
  }

  pub fn register<P, C: for<'de> serde::Deserialize<'de>, R: serde::Serialize>(&self, name: &str, procedure: P) -> Result<()>
  where
    P: Fn(C) -> R + 'a,
  {
    trace!("register procedure");

    self.procedures.lock().map_err(|_| Error::Lock)?.insert(
      name.to_string(),
      Box::new(move |call: Call<B::Intermediate>| {
        let reply = procedure(B::deserialize::<C>(&call.payload)?);
        Ok(Reply { payload: B::serialize::<R>(&reply)? })
      }),
    );
    Ok(())
  }

  pub fn call<C: serde::Serialize, R: for<'de> serde::Deserialize<'de>>(&self, procedure: &str, payload: &C) -> Result<R> {
    trace!("call procedure");

    Ok(B::deserialize(
      &self.call.as_ref().ok_or(Error::CallNotRegistered)?(Call {
        procedure: procedure.to_string(),
        payload: B::serialize(&payload)?,
      })?
      .payload,
    )?)
  }
}

impl<'a, B: Backend> Frontend for Register<'a, B> {
  type Backend = B;

  fn register<T>(&mut self, caller: T) -> Result<()>
  where
    T: Fn(Call<<Self::Backend as Backend>::Intermediate>) -> Result<Reply<<Self::Backend as Backend>::Intermediate>> + 'a + Send,
  {
    trace!("register caller");

    self.call = Some(Box::new(caller));
    Ok(())
  }

  #[allow(clippy::type_complexity)]
  fn receive(&self, call: Call<<Self::Backend as Backend>::Intermediate>) -> Result<Reply<<Self::Backend as Backend>::Intermediate>> {
    trace!("receive call");

    self
      .procedures
      .lock()
      .map_err(|_| Error::Lock)?
      .get(&call.procedure)
      .ok_or_else::<anyhow::Error, _>(|| Error::ProcedureNotRegistered(call.procedure.to_owned()).into())?(call)
  }
}