ya-service-bus 0.7.3

Golem Service Bus
Documentation
#![allow(clippy::redundant_closure)]
use actix::Message;
use futures::prelude::Stream;
use serde::{de::DeserializeOwned, Serialize};
use std::{fmt::Debug, future::Future};

pub mod actix_rpc;
pub mod connection;
pub mod error;
mod local_router;
mod remote_router;
pub mod serialization;
pub mod timeout;
#[allow(clippy::needless_doctest_main)]
pub mod typed;
pub mod untyped;

pub use error::Error;

pub trait RpcMessage: Serialize + DeserializeOwned + 'static + Sync + Send {
    const ID: &'static str;
    type Item: Serialize + DeserializeOwned + 'static + Sync + Send;
    type Error: Serialize + DeserializeOwned + 'static + Sync + Send + Debug;
}

pub trait RpcStreamMessage: Serialize + DeserializeOwned + 'static + Sync + Send {
    const ID: &'static str;
    type Item: Serialize + DeserializeOwned + 'static + Sync + Send;
    type Error: Serialize + DeserializeOwned + 'static + Sync + Send + Debug;
}

pub struct RpcEnvelope<T> {
    caller: String,
    body: T,
}

#[derive(Debug)]
pub struct RpcStreamCall<T: RpcStreamMessage> {
    pub caller: String,
    pub addr: String,
    pub body: T,
    pub reply: futures::channel::mpsc::Sender<Result<T::Item, T::Error>>,
}

// Represents raw response chunk
pub enum ResponseChunk {
    Part(Vec<u8>),
    Full(Vec<u8>),
}

impl ResponseChunk {
    pub fn into_bytes(self) -> Vec<u8> {
        match self {
            ResponseChunk::Part(data) => data,
            ResponseChunk::Full(data) => data,
        }
    }

    #[inline]
    pub fn is_full(&self) -> bool {
        #[allow(clippy::match_like_matches_macro)]
        match self {
            ResponseChunk::Full(_) => true,
            _ => false,
        }
    }

    pub fn is_eos(&self) -> bool {
        match self {
            ResponseChunk::Full(data) => data.is_empty(),
            _ => false,
        }
    }
}

pub struct RpcRawStreamCall {
    pub caller: String,
    pub addr: String,
    pub body: Vec<u8>,
    pub reply: futures::channel::mpsc::Sender<Result<ResponseChunk, error::Error>>,
}

impl Message for RpcRawStreamCall {
    type Result = Result<(), error::Error>;
}

pub struct RpcRawCall {
    pub caller: String,
    pub addr: String,
    pub body: Vec<u8>,
    pub no_reply: bool,
}

impl RpcRawCall {
    fn from_envelope_addr<T: Serialize>(
        envelope: RpcEnvelope<T>,
        addr: String,
        no_reply: bool,
    ) -> Self {
        RpcRawCall {
            caller: envelope.caller,
            addr,
            body: crate::serialization::to_vec(&envelope.body).unwrap(),
            no_reply,
        }
    }
}

impl Message for RpcRawCall {
    type Result = Result<Vec<u8>, error::Error>;
}

impl<T: RpcStreamMessage> Message for RpcStreamCall<T> {
    type Result = Result<(), error::Error>;
}

impl<T: RpcMessage> RpcEnvelope<T> {
    pub fn into_inner(self) -> T {
        self.body
    }

    pub fn with_caller(caller: impl ToString, body: T) -> Self {
        RpcEnvelope {
            caller: caller.to_string(),
            body,
        }
    }

    pub fn local(body: T) -> Self {
        RpcEnvelope {
            caller: "local".into(),
            body,
        }
    }

    pub fn caller(&self) -> &str {
        self.caller.as_str()
    }
}

impl<T: RpcMessage> Message for RpcEnvelope<T> {
    type Result = Result<T::Item, T::Error>;
}

impl<T: RpcMessage> AsRef<T> for RpcEnvelope<T> {
    fn as_ref(&self) -> &T {
        &self.body
    }
}

impl<T: RpcMessage> AsMut<T> for RpcEnvelope<T> {
    fn as_mut(&mut self) -> &mut T {
        &mut self.body
    }
}

impl<T: RpcMessage> std::ops::Deref for RpcEnvelope<T> {
    type Target = T;

    fn deref(&self) -> &Self::Target {
        &self.body
    }
}

impl<T: RpcMessage> std::ops::DerefMut for RpcEnvelope<T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.body
    }
}

pub trait RpcEndpoint<T: RpcMessage>: Clone {
    type Result: Future<Output = Result<<RpcEnvelope<T> as Message>::Result, error::Error>>;

    fn send(&self, msg: T) -> Self::Result;

    fn send_as(&self, caller: impl ToString + 'static, msg: T) -> Self::Result;
}

pub trait RpcHandler<T: RpcMessage> {
    type Result: Future<Output = <RpcEnvelope<T> as Message>::Result> + 'static;

    fn handle(&mut self, caller: String, msg: T) -> Self::Result;
}

pub trait RpcStreamHandler<T: RpcStreamMessage> {
    type Result: Stream<Item = Result<T::Item, T::Error>> + Unpin;

    fn handle(&mut self, caller: &str, msg: T) -> Self::Result;
}

pub struct Handle {
    pub(crate) _inner: (),
}