use std::thread::sleep;
use std::time::Duration;
use internet2::ZmqSocketType;
use microservices::esb;
use crate::bus::{ctl::CtlMsg, info::InfoMsg, BusMsg, ServiceBus};
use crate::service::Endpoints;
use crate::service::ServiceConfig;
use crate::{Error, LogStyle, ServiceId};
#[repr(C)]
pub struct Client {
identity: ServiceId,
response_queue: std::collections::VecDeque<BusMsg>,
esb: esb::Controller<ServiceBus, BusMsg, Handler>,
}
impl Client {
pub fn with(config: ServiceConfig) -> Result<Self, Error> {
debug!("Setting up RPC client...");
let identity = ServiceId::client();
let esb = esb::Controller::with(
map! {
ServiceBus::Ctl => esb::BusConfig::with_addr(
config.ctl_endpoint,
ZmqSocketType::RouterConnect,
Some(ServiceId::router())
),
ServiceBus::Info => esb::BusConfig::with_addr(
config.info_endpoint,
ZmqSocketType::RouterConnect,
Some(ServiceId::router()),
)
},
Handler {
identity: identity.clone(),
},
)?;
sleep(Duration::from_secs_f32(0.1));
Ok(Self {
identity,
response_queue: empty!(),
esb,
})
}
pub fn identity(&self) -> ServiceId {
self.identity.clone()
}
pub fn request_info(&mut self, daemon: ServiceId, req: InfoMsg) -> Result<(), Error> {
debug!("Executing {}", req);
self.esb
.send_to(ServiceBus::Info, daemon, BusMsg::Info(req))?;
Ok(())
}
pub fn request_ctl(&mut self, daemon: ServiceId, req: CtlMsg) -> Result<(), Error> {
debug!("Executing {}", req);
self.esb
.send_to(ServiceBus::Ctl, daemon, BusMsg::Ctl(req))?;
Ok(())
}
pub fn response(&mut self) -> Result<BusMsg, Error> {
if self.response_queue.is_empty() {
for rep in self.esb.recv_poll()? {
self.response_queue.push_back(rep.request);
}
}
Ok(self
.response_queue
.pop_front()
.expect("We always have at least one element"))
}
pub fn report_failure(&mut self) -> Result<BusMsg, Error> {
match self.response()? {
BusMsg::Ctl(CtlMsg::Failure(fail)) => Err(Error::Farcaster(fail.info)),
resp => Ok(resp),
}
}
pub fn report_response_or_fail(&mut self) -> Result<(), Error> {
let resp = self.report_failure()?;
println!("{}", resp);
Ok(())
}
pub fn report_progress(&mut self) -> Result<(), Error> {
loop {
match self.report_failure() {
Err(e) =>
{
break Err(e)
}
Ok(BusMsg::Ctl(CtlMsg::Success(s))) => {
println!("{}", s.bright_green_bold());
break Ok(());
}
Ok(req) => println!("{}", req),
}
}
}
}
pub struct Handler {
identity: ServiceId,
}
impl esb::Handler<ServiceBus> for Handler {
type Request = BusMsg;
type Error = Error;
fn identity(&self) -> ServiceId {
self.identity.clone()
}
fn handle(
&mut self,
_endpoints: &mut Endpoints,
_bus: ServiceBus,
_addr: ServiceId,
_request: BusMsg,
) -> Result<(), Error> {
Ok(())
}
fn handle_err(&mut self, _: &mut Endpoints, err: esb::Error<ServiceId>) -> Result<(), Error> {
Err(Error::Esb(err))
}
}