merfolk_backend_serialport 0.1.0

A serial port `Backend` for merfolk.
Documentation
use merfolk::{interfaces::Backend, Call, Reply};

use anyhow::Result;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use std::{fmt::Debug, sync::Arc};
use tokio::{runtime::Runtime, sync::Mutex};

use log::{debug, error, info, trace};

#[derive(Debug, Error)]
pub enum Error {
  #[error("serializing failed: {0}")]
  Serialize(#[source] ron::Error),
  #[error("deserializing failed: {0}")]
  Deserialize(#[source] ron::Error),
  #[error("no receiver was degistered by init()")]
  NoReceiver,
  #[error("could not create runtime: {0}")]
  RuntimeCreation(#[from] std::io::Error),
  #[error("already started")]
  AlreadyStarted,
  #[error("not started")]
  NotStarted,
  #[error("error while sending: {0}")]
  SendError(#[source] std::io::Error),
  #[error("no sender channel still alive")]
  NoSenderChannel,
  #[error("from frontend: {0}")]
  FromFrontend(#[source] anyhow::Error),
  #[error("{0} must be initialized")]
  Init(String),
}

#[derive(derive_builder::Builder)]
#[builder(pattern = "owned")]
pub struct SerialPort {
  #[builder(setter(name = "port_setter"), private)]
  port: Arc<Mutex<Box<dyn serialport::SerialPort>>>,

  #[allow(clippy::type_complexity)]
  #[builder(private, default = "None")]
  receiver: Option<Arc<dyn Fn(Call<String>) -> Result<Reply<String>> + Send + Sync>>,

  #[builder(private, default = "None")]
  reply_queue: Option<Arc<Mutex<tokio::sync::mpsc::Receiver<String>>>>,

  #[builder(private, default = "Runtime::new().map_err(Error::RuntimeCreation).map_err(|e| e.to_string())?")]
  runtime: Runtime,

  #[builder(private, default = "None")]
  handle: Option<tokio::task::JoinHandle<std::convert::Infallible>>,
}

impl SerialPortBuilder {
  pub fn port<S: 'static + serialport::SerialPort>(self, value: S) -> Self {
    self.port_setter(Arc::new(Mutex::new(Box::new(value))))
  }
}

impl SerialPort {
  pub fn builder() -> SerialPortBuilder {
    SerialPortBuilder::default()
  }
}

#[derive(Serialize, Deserialize)]
struct SelfCall {
  procedure: String,
  payload: String,
}

#[derive(Serialize, Deserialize)]
struct SelfReply {
  payload: String,
}

impl SerialPort {
  pub fn start(&mut self) -> Result<()> {
    trace!("start SerialPort Backend");

    if self.handle.is_some() {
      return Err(Error::AlreadyStarted.into());
    }

    let receiver = Arc::clone(self.receiver.as_ref().ok_or(Error::NoReceiver)?);

    let (tx, rx) = tokio::sync::mpsc::channel::<String>(2);

    self.reply_queue = Some(Arc::new(Mutex::new(rx)));

    let port = Arc::clone(&self.port);

    self.handle = Some(self.runtime.spawn(async move {
      trace!("spawn listener");

      loop {
        trace!("reading serialport");

        let mut read: Vec<u8> = vec![];

        let mut port_gate = port.lock().await;

        loop {
          let mut buf: Vec<u8> = vec![0; 1024];

          match port_gate.read(buf.as_mut_slice()) {
            Ok(n) => {
              debug!("{} read {} bytes", port_gate.name().unwrap_or_else(|| "".to_string()), n);
              read.append(&mut buf[0..n].to_vec());

              if n != buf.len() {
                break;
              }
            }
            Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => {
              debug!("{} read timeout", port_gate.name().unwrap_or_else(|| "".to_string()));
              break;
            }
            Err(e) => {
              error!("{:?}", e);
              break;
            }
          }
        }

        if !read.is_empty() {
          if let Ok(read_string) = String::from_utf8(read) {
            let read_parts = read_string.split("\r\n");

            for part in read_parts {
              if part.is_empty() {
                break;
              }

              match &part[0..2] {
                "r:" => {
                  debug!("{} read reply", port_gate.name().unwrap_or_else(|| "".to_string()));

                  match tx.send(part[2..].to_string()).await {
                    Ok(_) => {}
                    Err(e) => {
                      for _ in 0..2 {
                        match port_gate.write(&("r:".to_string() + &Self::serialize(&Err::<SelfReply, _>(e.to_string())).unwrap() + "\r\n").as_bytes()) {
                          Ok(n) => {
                            debug!("{} sent r: {} bytes", port_gate.name().unwrap_or_else(|| "".to_string()), n);
                            break;
                          }
                          Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => (),
                          Err(e) => log::error!("{:?}", e),
                        }
                      }
                    }
                  }
                }
                "c:" => {
                  debug!("{} read call", port_gate.name().unwrap_or_else(|| "".to_string()));

                  let read_unpacked = part[2..].to_string();

                  let self_reply_string = match Self::deserialize::<SelfCall>(&read_unpacked) {
                    Ok(self_call) => {
                      let reply = receiver(Call {
                        procedure: self_call.procedure,
                        payload: self_call.payload,
                      });

                      let self_reply = match reply.map(|r| SelfReply { payload: r.payload }) {
                        Ok(ok) => std::result::Result::Ok(ok),
                        Err(err) => std::result::Result::Err(err.to_string()),
                      };
                      match &Self::serialize(&self_reply) {
                        Ok(ser) => "r:".to_string() + ser + "\r\n",
                        Err(e) => "r:".to_string() + &Self::serialize(&Err::<SelfReply, _>(e.to_string())).unwrap() + "\r\n",
                      }
                    }
                    Err(e) => "r:".to_string() + &Self::serialize(&Err::<SelfReply, _>(e.to_string())).unwrap() + "\r\n",
                  };

                  for _ in 0..2 {
                    match port_gate.write(&self_reply_string.as_bytes()) {
                      Ok(n) => {
                        debug!("{} sent r: {} bytes", port_gate.name().unwrap_or_else(|| "".to_string()), n);
                        break;
                      }
                      Err(ref e) if e.kind() == std::io::ErrorKind::TimedOut => (),
                      Err(e) => log::error!("{:?}", e),
                    }
                  }
                }
                _ => {}
              }
            }
          };
        }
      }
    }));
    Ok(())
  }

  pub fn stop(&mut self) -> Result<()> {
    trace!("stop serialport backend");
    match &self.handle {
      None => Err(Error::NotStarted.into()),
      Some(handle) => {
        handle.abort();
        Ok(())
      }
    }
  }
}
impl Backend for SerialPort {
  type Intermediate = String;

  fn register<T>(&mut self, receiver: T) -> Result<()>
  where
    T: Fn(Call<Self::Intermediate>) -> Result<Reply<Self::Intermediate>> + Send + Sync + 'static,
  {
    trace!("register receiver");

    self.receiver = Some(Arc::new(move |call: Call<String>| {
      trace!("run receiver");

      debug!("calling receiver");
      receiver(call)
    }));

    self.start().ok();

    Ok(())
  }

  fn call(&mut self, call: Call<Self::Intermediate>) -> Result<Reply<Self::Intermediate>> {
    trace!("call backend");

    info!("received outgoing call");

    if self.reply_queue.is_none() {
      return Err(Error::NotStarted.into());
    }

    let port = Arc::clone(&self.port);
    let reply_queue = Arc::clone(&self.reply_queue.as_ref().unwrap());

    self.runtime.block_on(async move {
      let self_call = SelfCall {
        procedure: call.procedure,
        payload: call.payload,
      };
      let self_call_string = "c:".to_string() + &Self::serialize(&self_call).unwrap() + "\r\n";

      let port_name;
      let written;
      {
        let mut port_gate = port.lock().await;

        port_name = port_gate.name().unwrap_or_else(|| "".to_string());

        written = port_gate.write(&self_call_string.as_bytes());
      }

      match written {
        Ok(n) => {
          debug!("{} sent c: {} bytes", port_name, n);
          let mut queue_lock = reply_queue.lock().await;

          match queue_lock.recv().await {
            Some(self_reply_string) => Ok(Reply {
              payload: Self::deserialize::<Result<SelfReply, String>>(&self_reply_string)?
                .map_err(|e| Error::FromFrontend(anyhow::anyhow!(e)))?
                .payload,
            }),
            None => Err(Error::NoSenderChannel.into()),
          }
        }
        Err(e) => Err(Error::SendError(e).into()),
      }
    })
  }

  fn serialize<T: serde::Serialize>(from: &T) -> Result<String> {
    trace!("serialize from");

    ron::ser::to_string(from).map_err(|e| Error::Serialize(e).into())
  }

  fn deserialize<'b, T>(from: &'b Self::Intermediate) -> Result<T>
  where
    T: for<'de> serde::Deserialize<'de>,
  {
    trace!("deserialize from");

    ron::de::from_str(&from).map_err(|e| Error::Deserialize(e).into())
  }
}

impl Drop for SerialPort {
  fn drop(&mut self) {
    if self.handle.is_some() {
      self.stop().unwrap()
    }
  }
}