use std::collections::HashMap;
use std::io::Cursor;
use internet2::session::LocalSession;
use internet2::{zeromq, SendRecvMessage, Unmarshall, Unmarshaller, ZmqSocketType};
use super::{BusId, Error, ServiceAddress};
use crate::esb::BusConfig;
#[cfg(feature = "node")]
use crate::node::TryService;
use crate::rpc::Request;
use crate::ZMQ_CONTEXT;
pub trait Handler<B>
where
Self: Sized,
B: BusId,
Error<B::Address>: From<Self::Error>,
{
type Request: Request;
type Error: std::error::Error;
fn identity(&self) -> B::Address;
fn on_ready(&mut self, _endpoints: &mut EndpointList<B>) -> Result<(), Self::Error> { Ok(()) }
fn handle(
&mut self,
endpoints: &mut EndpointList<B>,
bus_id: B,
source: B::Address,
request: Self::Request,
) -> Result<(), Self::Error>;
fn handle_err(
&mut self,
endpoints: &mut EndpointList<B>,
error: Error<B::Address>,
) -> Result<(), Self::Error>;
}
struct Endpoint<A>
where
A: ServiceAddress,
{
pub(self) session: LocalSession,
pub(self) router: Option<A>,
}
impl<A> Endpoint<A>
where
A: ServiceAddress,
{
pub(self) fn send_to<R>(&mut self, source: A, dest: A, request: R) -> Result<(), Error<A>>
where
R: Request,
{
let data = request.serialize();
let router = match self.router {
None => {
trace!("Sending {} from {} to {} directly", request, source, dest,);
dest.clone()
}
Some(ref router) if &source == router => {
trace!("Routing {} from {} to {}", request, source, dest,);
dest.clone()
}
Some(ref router) => {
trace!("Sending {} from {} to {} via router {}", request, source, dest, router,);
router.clone()
}
};
let src = source.clone();
let dst = dest.clone();
self.session
.send_routed_message(&source.into(), &router.into(), &dest.into(), &data)
.map_err(|err| Error::Send(src, dst, err))?;
Ok(())
}
#[inline]
pub(self) fn set_identity(&mut self, identity: A) -> Result<(), Error<A>> {
self.session.set_identity(&identity.into(), &ZMQ_CONTEXT).map_err(Error::from)
}
}
#[derive(Default)]
pub struct EndpointList<B>(pub(self) HashMap<B, Endpoint<B::Address>>)
where
B: BusId;
impl<B> EndpointList<B>
where
B: BusId,
{
pub fn new() -> Self { Self(Default::default()) }
pub fn send_to<R>(
&mut self,
bus_id: B,
source: B::Address,
dest: B::Address,
request: R,
) -> Result<(), Error<B::Address>>
where
R: Request,
{
let session =
self.0.get_mut(&bus_id).ok_or_else(|| Error::UnknownBusId(bus_id.to_string()))?;
session.send_to(source, dest, request)
}
pub fn set_identity(
&mut self,
bus_id: B,
identity: B::Address,
) -> Result<(), Error<B::Address>> {
self.0
.get_mut(&bus_id)
.ok_or_else(|| Error::UnknownBusId(bus_id.to_string()))?
.set_identity(identity)
}
}
#[derive(Getters)]
pub struct Controller<B, R, H>
where
R: Request,
B: BusId,
H: Handler<B, Request = R>,
Error<B::Address>: From<H::Error>,
{
endpoints: EndpointList<B>,
unmarshaller: Unmarshaller<R>,
handler: H,
}
#[derive(Debug)]
pub struct PollItem<B, R>
where
B: BusId,
R: Request,
{
pub bus_id: B,
pub source: B::Address,
pub request: R,
}
impl<B, R, H> Controller<B, R, H>
where
R: Request,
B: BusId,
H: Handler<B, Request = R>,
Error<B::Address>: From<H::Error>,
{
pub fn with(
service_bus: HashMap<B, BusConfig<B::Address>>,
handler: H,
) -> Result<Self, Error<B::Address>> {
let endpoints = EndpointList::new();
let unmarshaller = R::create_unmarshaller();
let mut me = Self { endpoints, unmarshaller, handler };
for (id, config) in service_bus {
me.add_service_bus(id, config)?;
}
Ok(me)
}
pub fn add_service_bus(
&mut self,
id: B,
config: BusConfig<B::Address>,
) -> Result<(), Error<B::Address>> {
let session = match config.carrier {
zeromq::Carrier::Locator(locator) => {
debug!(
"Creating ESB session for service {} located at {} with identity '{}'",
id,
locator,
self.handler.identity()
);
LocalSession::connect(
config.api_type,
&locator,
None,
Some(&self.handler.identity().into()),
&ZMQ_CONTEXT,
)?
}
zeromq::Carrier::Socket(socket) => {
debug!("Creating ESB session for service {}", &id);
LocalSession::with_zmq_socket(config.api_type, socket)
}
};
if !config.queued {
session.as_socket().set_router_mandatory(true)?;
}
if config.api_type == ZmqSocketType::Sub {
session.as_socket().set_subscribe(config.topic.unwrap_or_default().as_bytes())?;
}
let router = match config.router {
Some(router) if router == self.handler.identity() => None,
router => router,
};
self.endpoints.0.insert(id, Endpoint { session, router });
Ok(())
}
pub fn send_to(
&mut self,
bus_id: B,
dest: B::Address,
request: R,
) -> Result<(), Error<B::Address>> {
self.endpoints.send_to(bus_id, self.handler.identity(), dest, request)
}
pub fn recv_poll(&mut self) -> Result<Vec<PollItem<B, R>>, Error<B::Address>> {
let mut vec = vec![];
for bus_id in self.poll()? {
let sender = self.endpoints.0.get_mut(&bus_id).expect("must exist, just indexed");
let routed_frame = sender.session.recv_routed_message()?;
let request = (*self.unmarshaller.unmarshall(Cursor::new(routed_frame.msg))?).clone();
let source = B::Address::from(routed_frame.src);
vec.push(PollItem { bus_id, source, request });
}
Ok(vec)
}
}
#[cfg(feature = "node")]
impl<B, R, H> TryService for Controller<B, R, H>
where
R: Request,
B: BusId,
H: Handler<B, Request = R>,
Error<B::Address>: From<H::Error>,
{
type ErrorType = Error<B::Address>;
fn try_run_loop(mut self) -> Result<(), Self::ErrorType> {
self.handler.on_ready(&mut self.endpoints)?;
loop {
match self.run() {
Ok(_) => trace!("request processing complete"),
Err(err) => {
error!("ESB request processing error: {}", err);
self.handler.handle_err(&mut self.endpoints, err)?;
}
}
}
}
}
impl<B, R, H> Controller<B, R, H>
where
R: Request,
B: BusId,
H: Handler<B, Request = R>,
Error<B::Address>: From<H::Error>,
{
#[cfg(feature = "node")]
fn run(&mut self) -> Result<(), Error<B::Address>> {
for bus_id in self.poll()? {
let sender = self.endpoints.0.get_mut(&bus_id).expect("must exist, just indexed");
let routed_frame = sender.session.recv_routed_message()?;
let request = (*self.unmarshaller.unmarshall(Cursor::new(routed_frame.msg))?).clone();
let source = B::Address::from(routed_frame.src);
let dest = B::Address::from(routed_frame.dst);
if dest == self.handler.identity() {
debug!("{} -> {}: {}", source, dest, request);
self.handler.handle(&mut self.endpoints, bus_id, source, request)?;
} else {
trace!("Routing {} from {} to {}", request, source, dest);
self.endpoints.send_to(bus_id, source, dest, request)?
}
}
Ok(())
}
fn poll(&mut self) -> Result<Vec<B>, Error<B::Address>> {
let mut index = vec![];
let mut items = self
.endpoints
.0
.iter()
.map(|(service, sender)| {
index.push(service);
sender.session.as_socket().as_poll_item(zmq::POLLIN | zmq::POLLERR)
})
.collect::<Vec<_>>();
trace!("Awaiting for ESB request from {} service buses...", items.len());
let _ = zmq::poll(&mut items, -1)?;
let service_buses = items
.iter()
.enumerate()
.filter_map(
|(i, item)| {
if item.get_revents().is_empty() {
None
} else {
Some(*index[i])
}
},
)
.collect::<Vec<_>>();
trace!("Received ESB request from {} service busses...", service_buses.len());
Ok(service_buses)
}
}