rplc 0.3.0

PLC programming in Rust
Documentation
use crate::tasks;
use eva_common::payload::{pack, unpack};
use eva_common::value::{to_value, Value};
use eva_common::{EResult, Error};
use log::error;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::fs;
use std::io;
use std::io::{Read, Write};
use std::os::unix;
use std::path::PathBuf;

const JSON_RPC: &str = "2.0";
const MAX_API_CONN: usize = 10;

#[derive(Serialize, Deserialize)]
pub struct Request {
    jsonrpc: String,
    method: String,
    params: Option<Value>,
}

impl Request {
    pub fn new(method: &str, params: Option<Value>) -> Self {
        Self {
            jsonrpc: JSON_RPC.to_owned(),
            method: method.to_owned(),
            params,
        }
    }
    fn check(&self) -> EResult<()> {
        if self.jsonrpc == JSON_RPC {
            Ok(())
        } else {
            Err(Error::unsupported("unsupported json rpc version"))
        }
    }
}

#[derive(Serialize, Deserialize)]
pub struct Response {
    jsonrpc: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub result: Option<Value>,
    #[serde(skip_serializing_if = "Option::is_none")]
    error: Option<ResponseError>,
}

impl Response {
    #[inline]
    fn err(e: Error) -> Self {
        Self {
            jsonrpc: JSON_RPC.to_owned(),
            result: None,
            error: Some(ResponseError {
                code: e.kind() as i16,
                message: e.message().map(ToOwned::to_owned),
            }),
        }
    }
    #[inline]
    fn result(val: Value) -> Self {
        Self {
            jsonrpc: JSON_RPC.to_owned(),
            result: Some(val),
            error: None,
        }
    }
    pub fn check(&self) -> EResult<()> {
        if self.jsonrpc != JSON_RPC {
            return Err(Error::unsupported("unsupported json rpc version"));
        }
        if let Some(ref err) = self.error {
            return Err(Error::newc(
                eva_common::ErrorKind::from(err.code),
                err.message.as_deref(),
            ));
        }
        Ok(())
    }
}

#[derive(Serialize, Deserialize)]
struct ResponseError {
    code: i16,
    message: Option<String>,
}

impl From<EResult<Value>> for Response {
    fn from(r: EResult<Value>) -> Self {
        match r {
            Ok(v) => Response::result(v),
            Err(e) => Response::err(e),
        }
    }
}

pub(crate) fn spawn_api() -> PathBuf {
    let mut socket_path = crate::var_dir();
    socket_path.push(format!("{}.plcsock", crate::name()));
    let _ = fs::remove_file(&socket_path);
    let listener = unix::net::UnixListener::bind(&socket_path).unwrap();
    tasks::spawn_service("api", move || {
        let pool = threadpool::ThreadPool::new(MAX_API_CONN);
        for sr in listener.incoming() {
            match sr {
                Ok(stream) => {
                    pool.execute(move || {
                        if let Err(e) = handle_api_stream(stream) {
                            error!("API {}", e);
                        }
                    });
                }
                Err(e) => error!("API {}", e),
            }
        }
    });
    socket_path
}

fn handle_api_stream(mut stream: unix::net::UnixStream) -> Result<(), Error> {
    stream.set_read_timeout(Some(crate::DEFAULT_TIMEOUT))?;
    stream.set_write_timeout(Some(crate::DEFAULT_TIMEOUT))?;
    loop {
        let mut buf: [u8; 5] = [0; 5];
        match stream.read_exact(&mut buf) {
            Ok(()) => {}
            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
                break;
            }
            Err(e) => {
                return Err(e.into());
            }
        }
        if buf[0] != 0 {
            return Err(Error::invalid_data("invalid header"));
        }
        let mut buf = vec![0; usize::try_from(u32::from_le_bytes(buf[1..].try_into()?))?];
        stream.read_exact(&mut buf)?;
        let req: Request = unpack(&buf)?;
        req.check()?;
        let response: Response = handle_api_call(&req.method, req.params).into();
        let packed = pack(&response)?;
        let mut buf = Vec::with_capacity(packed.len() + 5);
        buf.push(0u8);
        buf.extend(u32::try_from(packed.len())?.to_le_bytes());
        buf.extend(packed);
        stream.write_all(&buf)?;
    }
    Ok(())
}

fn handle_api_call(method: &str, params: Option<Value>) -> Result<Value, Error> {
    macro_rules! ok {
        () => {
            Ok(Value::Unit)
        };
    }
    macro_rules! invalid_params {
        () => {
            Err(Error::invalid_params("invalid method parameters"))
        };
    }
    match method {
        "test" => {
            if params.is_none() {
                ok!()
            } else {
                invalid_params!()
            }
        }
        "info" => {
            if params.is_none() {
                to_value(crate::plc_info()).map_err(Into::into)
            } else {
                invalid_params!()
            }
        }
        "thread_stats.get" => {
            if params.is_none() {
                let mut result = BTreeMap::new();
                let thread_stats = &tasks::controller_stats().lock().thread_stats;
                for (name, st) in thread_stats {
                    result.insert(name, st.info());
                }
                to_value(result).map_err(Into::into)
            } else {
                invalid_params!()
            }
        }
        "thread_stats.reset" => {
            if params.is_none() {
                tasks::reset_thread_stats();
                ok!()
            } else {
                invalid_params!()
            }
        }
        v => Err(Error::not_implemented(v)),
    }
}