rpcx_protocol 0.3.0

A rpc/microservice framework.
Documentation
use crate::Result;

use std::{
    cell::RefCell,
    fmt::Debug,
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
};

use crate::SerializeType;

use bytes::BytesMut;

use super::Error;

pub trait RpcxParam: Debug {
    fn into_bytes(&self, st: SerializeType) -> Result<Vec<u8>>;
    fn from_slice(&mut self, st: SerializeType, data: &[u8]) -> Result<()>;
}

impl RpcxParam for BytesMut {
    fn into_bytes(&self, _: SerializeType) -> Result<Vec<u8>> {
        let rt = self.to_vec();
        Ok(rt)
    }
    fn from_slice(&mut self, _: SerializeType, data: &[u8]) -> Result<()> {
        (*self).extend_from_slice(data);
        Ok(())
    }
}

#[derive(Debug)]
pub struct Status {
    pub ready: bool,
    pub task: Option<Waker>,
}

#[derive(Debug)]
pub struct Call {
    pub seq: u64,
    pub is_client_error: bool,
    pub state: Arc<Mutex<Status>>,
    pub error: String,
    pub reply_data: Vec<u8>,
}

impl Call {
    pub fn new(seq: u64) -> Self {
        Call {
            seq,
            is_client_error: true,
            state: Arc::new(Mutex::new(Status {
                ready: false,
                task: None,
            })),
            error: String::new(),
            reply_data: Vec::new(),
        }
    }
}

pub type ArcCall = Arc<Mutex<RefCell<Call>>>;

pub fn get_result<T>(arc_call: Option<ArcCall>, st: SerializeType) -> Result<T>
where
    T: RpcxParam + Default,
{
    if arc_call.is_none() {
        return Err(Error::from("reply is empty"));
    }
    let arc_call_1 = arc_call.unwrap().clone();
    let mut arc_call_2 = arc_call_1.lock().unwrap();
    let arc_call_3 = arc_call_2.get_mut();
    let reply_data = &arc_call_3.reply_data;
    if !arc_call_3.error.is_empty() {
        let err = &arc_call_3.error;
        return Err(Error::from(String::from(err)));
    }

    let mut reply: T = Default::default();
    match reply.from_slice(st, &reply_data) {
        Ok(()) => Ok(reply),
        Err(err) => Err(err),
    }
}
pub struct CallFuture {
    pub arc_call: Option<ArcCall>,
}

impl CallFuture {
    pub fn new(opt: Option<ArcCall>) -> Self {
        CallFuture { arc_call: opt }
    }
}

impl Future for CallFuture {
    type Output = Option<ArcCall>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        if self.arc_call.is_none() {
            return Poll::Ready(None);
        }

        let arc_call = self.arc_call.as_ref().unwrap().clone();
        let mut arc_call_1 = arc_call.lock().unwrap();
        let state = &arc_call_1.get_mut().state;
        let mut status = state.lock().expect("!lock");
        if status.ready {
            Poll::Ready(Some(arc_call.clone()))
        } else {
            status.task = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

unsafe impl Send for Call {}
unsafe impl Sync for Call {}