use crate::{RpcRawCall, RpcStreamCall, RpcStreamMessage};
use actix::prelude::*;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::sync::{Arc, Mutex};
use crate::local_router::{router, Router};
use crate::{RpcEnvelope, RpcMessage};
use super::error::Error as BusError;
use super::Handle;
use futures::Future;
pub fn bind<M: RpcMessage>(addr: &str, actor: Recipient<RpcEnvelope<M>>) -> Handle
where
<RpcEnvelope<M> as Message>::Result: Serialize + DeserializeOwned + Sync + Send,
{
router().lock().unwrap().bind_actor(addr, actor);
Handle { _inner: {} }
}
pub fn bind_raw(addr: &str, actor: Recipient<RpcRawCall>) -> Handle {
router().lock().unwrap().bind_raw(addr, actor);
Handle { _inner: {} }
}
pub fn binds<M: RpcStreamMessage>(addr: &str, actor: Recipient<RpcStreamCall<M>>) -> Handle
where
Result<M::Item, M::Error>: Serialize + DeserializeOwned + Sync + Send,
{
router().lock().unwrap().bind_stream_actor(addr, actor);
Handle { _inner: {} }
}
pub fn service(addr: &str) -> Endpoint {
Endpoint {
addr: addr.to_string(),
router: router(),
}
}
pub struct Endpoint {
addr: String,
router: Arc<Mutex<Router>>,
}
impl Endpoint {
pub fn send<M: RpcMessage + Serialize + DeserializeOwned + Sync + Send + Unpin>(
&self,
msg: M,
) -> impl Future<Output = Result<<RpcEnvelope<M> as Message>::Result, BusError>> + 'static {
let mut b = self.router.lock().unwrap();
b.forward(self.addr.as_ref(), RpcEnvelope::local(msg))
}
pub fn send_as<M: RpcMessage + Serialize + DeserializeOwned + Sync + Send + Unpin>(
&self,
caller: impl ToString,
msg: M,
) -> impl Future<Output = Result<<RpcEnvelope<M> as Message>::Result, BusError>> + 'static {
let mut b = self.router.lock().unwrap();
b.forward(self.addr.as_ref(), RpcEnvelope::with_caller(caller, msg))
}
pub fn push<M: RpcMessage + Serialize + DeserializeOwned + Sync + Send + Unpin>(
&self,
msg: M,
) -> impl Future<Output = Result<(), BusError>> + 'static {
let mut b = self.router.lock().unwrap();
b.push(self.addr.as_ref(), RpcEnvelope::local(msg))
}
pub fn push_as<M: RpcMessage + Serialize + DeserializeOwned + Sync + Send + Unpin>(
&self,
caller: impl ToString,
msg: M,
) -> impl Future<Output = Result<(), BusError>> + 'static {
let mut b = self.router.lock().unwrap();
b.push(self.addr.as_ref(), RpcEnvelope::with_caller(caller, msg))
}
pub fn call_stream<M: RpcStreamMessage>(
&self,
msg: M,
) -> impl Stream<Item = Result<Result<M::Item, M::Error>, BusError>> {
self.router
.lock()
.unwrap()
.streaming_forward(&self.addr, msg)
}
}