use std::fmt::Debug;
use internet2::zeromq::{self, ZmqSocketType};
use lnp_rpc::RpcMsg;
use microservices::esb::{self, ClientId};
use microservices::node::TryService;
use crate::bus::{self, BusMsg, CtlMsg, Report, ServiceBus};
use crate::rpc::{Failure, ServiceId};
use crate::{Config, Error};
pub struct BridgeHandler;
impl esb::Handler<ServiceBus> for BridgeHandler {
type Request = BusMsg;
type Error = Error;
fn identity(&self) -> ServiceId { ServiceId::Loopback }
fn handle(
&mut self,
_: &mut Endpoints,
_: ServiceBus,
_: ServiceId,
_: BusMsg,
) -> Result<(), Error> {
Ok(())
}
fn handle_err(
&mut self,
_: &mut Endpoints,
err: esb::Error<ServiceId>,
) -> Result<(), Self::Error> {
Err(err.into())
}
}
pub struct Service<Runtime>
where
Runtime: esb::Handler<ServiceBus, Request = BusMsg>,
esb::Error<ServiceId>: From<Runtime::Error>,
{
esb: esb::Controller<ServiceBus, BusMsg, Runtime>,
broker: bool,
}
impl<Runtime> Service<Runtime>
where
Runtime: esb::Handler<ServiceBus, Request = BusMsg>,
esb::Error<ServiceId>: From<Runtime::Error>,
{
pub fn run(config: Config<()>, runtime: Runtime, broker: bool) -> Result<(), Error> {
let service = Self::with(config, runtime, broker)?;
service.run_loop()?;
unreachable!()
}
fn with<Ext>(
config: Config<Ext>,
runtime: Runtime,
broker: bool,
) -> Result<Self, esb::Error<ServiceId>>
where
Ext: Clone + Eq + Debug,
{
let router = if !broker { Some(ServiceId::router()) } else { None };
let api_type =
if broker { ZmqSocketType::RouterBind } else { ZmqSocketType::RouterConnect };
let services = map! {
ServiceBus::Msg => esb::BusConfig::with_addr(
config.msg_endpoint,
api_type,
router.clone()
),
ServiceBus::Ctl => esb::BusConfig::with_addr(
config.ctl_endpoint,
api_type,
router.clone()
),
ServiceBus::Rpc => esb::BusConfig::with_addr(config.rpc_endpoint, api_type, router)
};
let esb = esb::Controller::with(services, runtime)?;
Ok(Self { esb, broker })
}
pub fn broker(config: Config<()>, runtime: Runtime) -> Result<Self, esb::Error<ServiceId>> {
Self::with(config, runtime, true)
}
#[allow(clippy::self_named_constructors)]
pub fn service<Ext>(
config: Config<Ext>,
runtime: Runtime,
) -> Result<Self, esb::Error<ServiceId>>
where
Ext: Clone + Eq + Debug,
{
Self::with(config, runtime, false)
}
pub fn is_broker(&self) -> bool { self.broker }
pub fn add_loopback(&mut self, socket: zmq::Socket) -> Result<(), esb::Error<ServiceId>> {
self.esb.add_service_bus(ServiceBus::Bridge, esb::BusConfig {
api_type: ZmqSocketType::Push,
carrier: zeromq::Carrier::Socket(socket),
router: None,
queued: true,
topic: None,
})
}
pub fn run_loop(mut self) -> Result<(), Error> {
if !self.is_broker() {
std::thread::sleep(core::time::Duration::from_secs(1));
self.esb.send_to(ServiceBus::Ctl, ServiceId::LnpBroker, BusMsg::Ctl(CtlMsg::Hello))?;
}
let identity = self.esb.handler().identity();
info!("{} started", identity);
self.esb.run_or_panic(&identity.to_string());
unreachable!()
}
}
pub type Endpoints = esb::EndpointList<ServiceBus>;
pub trait TryToServiceId {
fn try_to_service_id(&self) -> Option<ServiceId>;
}
impl TryToServiceId for ServiceId {
fn try_to_service_id(&self) -> Option<ServiceId> { Some(self.clone()) }
}
impl TryToServiceId for &Option<ServiceId> {
fn try_to_service_id(&self) -> Option<ServiceId> { (*self).clone() }
}
impl TryToServiceId for Option<ServiceId> {
fn try_to_service_id(&self) -> Option<ServiceId> { self.clone() }
}
pub trait Responder
where
Self: esb::Handler<ServiceBus>,
esb::Error<ServiceId>: From<Self::Error>,
{
#[inline]
fn enquirer(&self) -> Option<ClientId> { None }
fn report_success(
&mut self,
endpoints: &mut Endpoints,
msg: Option<impl ToString>,
) -> Result<(), Error> {
if let Some(ref message) = msg {
info!("{}", message.to_string());
}
if let Some(client) = self.enquirer() {
let status = bus::Status::Success(msg.map(|m| m.to_string()).into());
let report = CtlMsg::Report(Report { client, status });
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
ServiceId::LnpBroker,
BusMsg::Ctl(report),
)?;
}
Ok(())
}
fn report_progress(
&mut self,
endpoints: &mut Endpoints,
msg: impl ToString,
) -> Result<(), Error> {
let msg = msg.to_string();
info!("{}", msg);
if let Some(client) = self.enquirer() {
let status = bus::Status::Progress(msg);
let report = CtlMsg::Report(Report { client, status });
endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
ServiceId::LnpBroker,
BusMsg::Ctl(report),
)?;
}
Ok(())
}
fn report_failure(&mut self, endpoints: &mut Endpoints, failure: impl Into<Failure>) -> Error {
let failure = failure.into();
if let Some(client) = self.enquirer() {
let status = bus::Status::Failure(failure.clone());
let report = CtlMsg::Report(Report { client, status });
let _ = endpoints.send_to(
ServiceBus::Ctl,
self.identity(),
ServiceId::LnpBroker,
BusMsg::Ctl(report),
);
}
Error::Terminate(failure.to_string())
}
fn send_ctl(
&mut self,
endpoints: &mut Endpoints,
dest: impl TryToServiceId,
request: CtlMsg,
) -> Result<(), esb::Error<ServiceId>> {
if let Some(dest) = dest.try_to_service_id() {
endpoints.send_to(ServiceBus::Ctl, self.identity(), dest, BusMsg::Ctl(request))?;
}
Ok(())
}
#[inline]
fn send_rpc(
&self,
endpoints: &mut Endpoints,
client_id: ClientId,
message: impl Into<RpcMsg>,
) -> Result<(), esb::Error<ServiceId>> {
endpoints.send_to(
ServiceBus::Rpc,
self.identity(),
ServiceId::Client(client_id),
BusMsg::Rpc(message.into()),
)
}
}