ya-service-bus 0.7.3

Golem Service Bus
Documentation
use super::Handle;
use crate::error::Error;
use crate::local_router::router;
use crate::ResponseChunk;
use futures::{Future, Stream, StreamExt};
use std::pin::Pin;

pub fn send(
    addr: &str,
    caller: &str,
    bytes: &[u8],
) -> impl Future<Output = Result<Vec<u8>, Error>> {
    forward_bytes(addr, caller, bytes, false)
}

pub fn push(
    addr: &str,
    caller: &str,
    bytes: &[u8],
) -> impl Future<Output = Result<Vec<u8>, Error>> {
    forward_bytes(addr, caller, bytes, true)
}

pub fn call_stream(
    addr: &str,
    caller: &str,
    bytes: &[u8],
) -> Pin<Box<dyn Stream<Item = Result<ResponseChunk, Error>>>> {
    router()
        .lock()
        .unwrap()
        .streaming_forward_bytes(addr, caller, bytes.into())
        .boxed_local()
}

fn forward_bytes(
    addr: &str,
    caller: &str,
    bytes: &[u8],
    no_reply: bool,
) -> impl Future<Output = Result<Vec<u8>, Error>> {
    router()
        .lock()
        .unwrap()
        .forward_bytes(addr, caller, bytes.into(), no_reply)
}

pub trait RawHandler {
    type Result: Future<Output = Result<Vec<u8>, Error>>;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], no_reply: bool) -> Self::Result;
}

impl<
        Output: Future<Output = Result<Vec<u8>, Error>>,
        F: FnMut(&str, &str, &[u8]) -> Output + 'static,
    > RawHandler for F
{
    type Result = Output;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], _no_reply: bool) -> Self::Result {
        self(caller, addr, msg)
    }
}

pub trait RawStreamHandler {
    type Result: Stream<Item = Result<ResponseChunk, Error>>;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], _no_reply: bool) -> Self::Result;
}

impl<
        Output: Stream<Item = Result<ResponseChunk, Error>>,
        F: FnMut(&str, &str, &[u8]) -> Output + 'static,
    > RawStreamHandler for F
{
    type Result = Output;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], _no_reply: bool) -> Self::Result {
        self(caller, addr, msg)
    }
}

impl RawStreamHandler for () {
    type Result = Pin<Box<dyn Stream<Item = Result<ResponseChunk, Error>>>>;

    fn handle(&mut self, _: &str, addr: &str, _: &[u8], _: bool) -> Self::Result {
        let addr = addr.to_string();
        futures::stream::once(async { Err(Error::NoEndpoint(addr)) }).boxed_local()
    }
}

pub struct Fn4Handler<R> {
    #[allow(clippy::type_complexity)]
    f: Box<dyn FnMut(&str, &str, &[u8], bool) -> R>,
}

impl<Fut> RawHandler for Fn4Handler<Fut>
where
    Fut: Future<Output = Result<Vec<u8>, Error>>,
{
    type Result = Fut;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], no_reply: bool) -> Self::Result {
        (*self.f)(caller, addr, msg, no_reply)
    }
}

impl<S> RawStreamHandler for Fn4Handler<S>
where
    S: Stream<Item = Result<ResponseChunk, Error>>,
{
    type Result = S;

    fn handle(&mut self, caller: &str, addr: &str, msg: &[u8], no_reply: bool) -> Self::Result {
        (*self.f)(caller, addr, msg, no_reply)
    }
}

pub trait Fn4HandlerExt<Fut>
where
    Fut: Future<Output = Result<Vec<u8>, Error>>,
{
    fn into_handler(self) -> Fn4Handler<Fut>;
}

impl<F, Fut> Fn4HandlerExt<Fut> for F
where
    Fut: Future<Output = Result<Vec<u8>, Error>>,
    F: FnMut(&str, &str, &[u8], bool) -> Fut + 'static,
{
    fn into_handler(self) -> Fn4Handler<Fut> {
        Fn4Handler { f: Box::new(self) }
    }
}

pub trait Fn4StreamHandlerExt<S>
where
    S: Stream<Item = Result<ResponseChunk, Error>>,
{
    fn into_stream_handler(self) -> Fn4Handler<S>;
}

impl<F, S> Fn4StreamHandlerExt<S> for F
where
    S: Stream<Item = Result<ResponseChunk, Error>>,
    F: FnMut(&str, &str, &[u8], bool) -> S + 'static,
{
    fn into_stream_handler(self) -> Fn4Handler<S> {
        Fn4Handler { f: Box::new(self) }
    }
}

mod raw_actor {
    use super::{Error, RawHandler};
    use crate::untyped::RawStreamHandler;
    use crate::{RpcRawCall, RpcRawStreamCall};
    use actix::prelude::*;
    use futures::{FutureExt, SinkExt, StreamExt};

    struct RawHandlerActor<H, S> {
        handler: H,
        stream_handler: S,
    }

    impl<H: Unpin + 'static, S: Unpin + 'static> Actor for RawHandlerActor<H, S> {
        type Context = Context<Self>;
    }

    impl<H: RawHandler + Unpin + 'static, S: Unpin + 'static> Handler<RpcRawCall>
        for RawHandlerActor<H, S>
    {
        type Result = ActorResponse<Self, Result<Vec<u8>, Error>>;

        fn handle(&mut self, msg: RpcRawCall, _ctx: &mut Self::Context) -> Self::Result {
            ActorResponse::r#async(
                self.handler
                    .handle(&msg.caller, &msg.addr, msg.body.as_ref(), msg.no_reply)
                    .boxed_local()
                    .into_actor(self),
            )
        }
    }

    impl<H: Unpin + 'static, S: RawStreamHandler + Unpin + 'static> Handler<RpcRawStreamCall>
        for RawHandlerActor<H, S>
    {
        type Result = Result<(), Error>;

        fn handle(&mut self, msg: RpcRawStreamCall, ctx: &mut Self::Context) -> Self::Result {
            let stream =
                self.stream_handler
                    .handle(&msg.caller, &msg.addr, msg.body.as_ref(), false);
            let sink = msg
                .reply
                .sink_map_err(|e| Error::GsbFailure(e.to_string()))
                .with(|r| futures::future::ready(Ok(Ok(r))));

            ctx.spawn(stream.forward(sink).map(|_| ()).into_actor(self));
            Ok(())
        }
    }

    pub fn recipients(
        h: impl RawHandler + Unpin + 'static,
        s: impl RawStreamHandler + Unpin + 'static,
    ) -> (Recipient<RpcRawCall>, Recipient<RpcRawStreamCall>) {
        let addr = RawHandlerActor {
            handler: h,
            stream_handler: s,
        }
        .start();
        (addr.clone().recipient(), addr.recipient())
    }
}

pub fn subscribe(
    addr: &str,
    rpc: impl RawHandler + Unpin + 'static,
    stream: impl RawStreamHandler + Unpin + 'static,
) -> Handle {
    let (rr, rs) = raw_actor::recipients(rpc, stream);
    router().lock().unwrap().bind_raw_dual(addr, rr, rs)
}