shvrpc 15.0.0

Rust implementation of the SHV-RPC protocol
Documentation
use std::sync::OnceLock;
use std::{env, fmt};
use std::io::BufReader;
use anyhow::anyhow;
use shvproto::{ChainPackReader, ChainPackWriter, MetaMap, RpcValue};
use shvproto::writer::Writer;
use shvproto::reader::Reader;
use crate::{RpcMessage, rpcmessage, RpcMessageMetaTags, rpctype};
use crate::util::hex_string;

#[derive(Clone, Debug)]
pub struct RpcFrame {
    pub protocol: Protocol,
    pub meta: MetaMap,
    raw_data: Vec<u8>,
    data_start: usize,
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum Protocol {
    ResetSession = 0,
    ChainPack = 1,
}
impl fmt::Display for Protocol {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        match self {
            Protocol::ChainPack => write!(fmt, "ChainPack"),
            Protocol::ResetSession => write!(fmt, "ResetSession"),
        }
    }
}
impl PartialEq for RpcFrame {
    fn eq(&self, other: &Self) -> bool {
        self.protocol == other.protocol
            && self.meta == other.meta
            && self.data() == other.data()
    }
}

impl RpcFrame {
    pub fn new(protocol: Protocol, meta: MetaMap, data: Vec<u8>) -> Self {
        Self {
            protocol,
            meta,
            raw_data: data,
            data_start: 0
        }
    }
    pub fn new_reset_session() -> Self {
        Self {
            protocol: Protocol::ResetSession,
            meta: MetaMap::new(),
            raw_data: vec![],
            data_start: 0,
        }
    }
    pub fn data(&self) -> &[u8] {
        self.raw_data.get(self.data_start..).expect("Wrong data start")
    }
    pub fn is_error(&self) -> bool {
        let data = self.data();
        let &[first, second, ..] = data else {
            return false;
        };
        const CHAINPACK_IMAP: u8 = shvproto::chainpack::PackingSchema::IMap as u8;
        const CHAINPACK_ERR_KEY: u8 = 0x40 + rpcmessage::Key::Error as u8;
        first == CHAINPACK_IMAP && second == CHAINPACK_ERR_KEY

    }
    pub fn from_raw_data(raw_data: Vec<u8>) -> Result<Self, anyhow::Error> {
        let [proto, rest @ ..] = raw_data.as_slice() else {
            return Err(anyhow!("Empty data cannot be converted to RpcFrame"));
        };
        if *proto == Protocol::ResetSession as u8 {
            return Ok(RpcFrame::new_reset_session())
        }
        if *proto != Protocol::ChainPack as u8 {
            return Err(anyhow!("Invalid protocol type received {proto:#02x}."));
        }
        let (meta, meta_len) = {
            let mut buffrd = BufReader::new(rest);
            let mut rd = ChainPackReader::new(&mut buffrd);
            match rd.try_read_meta() {
                Ok(m) => {
                    if let Some(meta) = m {
                        (meta, rd.position())
                    } else {
                        return Err(anyhow!("Incomplete frame meta received."))
                    }
                }
                Err(e) => {
                    return Err(anyhow!("Frame meta parse error: {e}."));
                }
            }
        };
        Ok(RpcFrame {
            protocol: Protocol::ChainPack,
            meta,
            raw_data,
            data_start: meta_len + 1, // meta_len + protocol_type
        })
    }
    pub fn from_rpcmessage(msg: &RpcMessage) -> crate::Result<RpcFrame> {
        let mut data = Vec::new();
        {
            let mut wr = ChainPackWriter::new(&mut data);
            wr.write_value(&msg.as_rpcvalue().value)?;
        }
        let meta = *msg.as_rpcvalue().meta.clone().unwrap_or_default();
        Ok(RpcFrame::new(Protocol::ChainPack, meta, data))
    }
    pub fn to_rpcmesage(&self) -> crate::Result<RpcMessage> {
        let mut buff = BufReader::new(self.data());
        let value = match &self.protocol {
            Protocol::ChainPack => {
                let mut rd = ChainPackReader::new(&mut buff);
                rd.read_value()?
            }
            Protocol::ResetSession => {
                return Err("Invalid protocol".into());
            }
        };
        Ok(RpcMessage::from_rpcvalue(RpcValue::new(value, Some(self.meta.clone())))?)
    }
    pub fn prepare_response_meta(src: &MetaMap) -> Result<MetaMap, &'static str> {
        if src.is_request() {
            if let Some(rqid) = src.request_id() {
                let mut dest = MetaMap::new();
                dest.insert(rpctype::Tag::MetaTypeId as i32, RpcValue::from(rpctype::global_ns::MetaTypeID::ChainPackRpcMessage as i32));
                dest.set_request_id(rqid);
                dest.set_caller_ids(&src.caller_ids());
                return Ok(dest)
            }
            return Err("Request ID is missing")
        }
        Err("Not RPC Request")
    }
}

static RPCMSG_LOG_LENGTH_THRESHOLD: OnceLock<usize> = OnceLock::new();

pub(crate) fn rpcmsg_log_length_threshold() -> usize {
    *RPCMSG_LOG_LENGTH_THRESHOLD.get_or_init(|| {
        env::var("RPCMSG_LOG_LENGTH_THRESHOLD")
            .ok()
            .and_then(|env_var| env_var.parse().ok())
            .unwrap_or(256)
    })
}

impl fmt::Display for RpcFrame {
    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
        if self.protocol == Protocol::ResetSession {
            write!(fmt, "RESET_SESSION")
        } else {
            write!(fmt, "{}", self.meta)?;
            let log_length_threshold = rpcmsg_log_length_threshold();
            if self.data().len() > log_length_threshold {
                write!(fmt, "[ ... {} bytes of data ... ]", self.data().len())
            } else {
                match RpcValue::from_chainpack(self.data()) {
                    Ok(rv) => {
                        write!(fmt, "{}", rv.to_cpon())
                    }
                    Err(e) => {
                        write!(fmt, "[{}] invalid data, unpack error: {}", hex_string(self.data(), Some(" ")), e)
                    }
                }
            }
        }
    }
}

impl rpcmessage::RpcMessageMetaTags for RpcFrame {
    type Target = RpcFrame;

    fn tag(&self, id: impl Into<i32>) -> Option<&RpcValue> {
        self.meta.tag(id)
    }
    fn set_tag(&mut self, id: impl Into<i32>, val: Option<impl Into<RpcValue>>) -> &mut Self::Target {
        self.meta.set_tag(id, val);
        self
    }
}

#[cfg(test)]
mod tests {
    use crate::rpcmessage::{RpcError, RpcErrorCode};
    use crate::RpcMessage;

    use super::RpcFrame;

    #[test]
    fn rpcframe_is_error() {
        let msg = RpcMessage::new_request("foo", "bar");
        let mut resp = msg.prepare_response().unwrap();
        resp.set_error(RpcError::new(RpcErrorCode::PermissionDenied, "msg"));
        let frame = RpcFrame::from_rpcmessage(&resp).unwrap();
        assert!(frame.is_error());

        let resp = msg.prepare_response().unwrap();
        let frame = RpcFrame::from_rpcmessage(&resp).unwrap();
        assert!(!frame.is_error());
    }
}