mod core;
pub mod subscription;
pub mod connector;
use self::core::*;
use self::subscription::*;
use super::{
topic::prefix_topic,
transport::Transport,
};
use crate::protocol::*;
use crate::err::*;
use std::{time::Duration};
use tokio::{
sync::mpsc::unbounded_channel, sync::mpsc::UnboundedSender, task::JoinHandle, time::timeout,
};
const ACK_TIMEOUT_S: u64 = 5;
pub struct Client {
task_sender: tokio::sync::mpsc::UnboundedSender<Task>,
core_join_handle: JoinHandle<BusResult<()>>
}
impl Client {
pub(crate) fn new(transport: impl Transport<ProtocolClient, ProtocolServer>) -> BusResult<Client>
{
let (task_sender, task_receiver) = tokio::sync::mpsc::unbounded_channel();
let core_join_handle = ClientCore::start(transport, task_receiver)?;
let client = Client { task_sender, core_join_handle };
Ok(client)
}
pub async fn stop_bus(&mut self) -> BusResult<()> {
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let task = Task::StopBus(TaskStopBus {
callback_ack: callback_ack_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
Ok(())
}
pub async fn stop(self) -> BusResult<()> {
let _ = self.task_sender.send(Task::Stop); self.core_join_handle.await??;
Ok(())
}
pub async fn serve<TProtocol>(
& self,
topic: &str,
) -> BusResult<RequestSubscription<TProtocol>>
where
TProtocol: RequestProtocol,
{
let topic = prefix_topic(TProtocol::prefix(), topic);
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let (callback_req_sender, mut callback_req_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (typed_request_sender, typed_request_receiver) = unbounded_channel();
let subscription =
RequestSubscription::new(&topic, typed_request_receiver, self.task_sender.clone());
let task = Task::Srv(TaskSrv {
msg: SrvMsg { topic },
callback_ack: callback_ack_sender,
callback_req: callback_req_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
tokio::spawn(async move {
while let Some((req_id, msg_req)) = callback_req_receiver.recv().await {
let data: Vec<u8> = msg_req.payload.into();
let req: TProtocol = crate::transport::cbor_codec::deser(&data[..]).unwrap();
let send_result = typed_request_sender.send((msg_req.topic, req_id, req));
if send_result.is_err() {
return;
}
}
});
Ok(subscription)
}
pub async fn request<TProtocol>(
&self,
topic: &str,
req: &TProtocol,
) -> BusResult<TProtocol::Rsp>
where
TProtocol: RequestProtocol,
{
let topic = prefix_topic(TProtocol::prefix(), topic);
let payload = crate::transport::cbor_codec::ser(req)?;
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let (callback_rsp_sender, callback_rsp_receiver) = tokio::sync::oneshot::channel();
let task = Task::Req(TaskReq {
msg: ReqMsg {
topic: topic.clone(), payload: payload.into(),
},
callback_ack: callback_ack_sender,
callback_rsp: callback_rsp_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
let rsp = callback_rsp_receiver.await?;
match rsp.status {
RspMsgStatus::Ok => {
let data: Vec<u8> = rsp.payload.into();
let rsp = crate::transport::cbor_codec::deser(&data[..])?;
Ok(rsp)
}
RspMsgStatus::Timeout => Err(BusError::RequestFailedTimeout)
}
}
pub async fn request_bytes(
&mut self,
topic_with_prefix: &str,
payload: Vec<u8>,
) -> BusResult<Vec<u8>> {
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let (callback_rsp_sender, callback_rsp_receiver) = tokio::sync::oneshot::channel();
let task = Task::Req(TaskReq {
msg: ReqMsg {
topic: topic_with_prefix.to_string(),
payload: payload.into(),
},
callback_ack: callback_ack_sender,
callback_rsp: callback_rsp_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
let rsp = callback_rsp_receiver.await?;
Ok(rsp.payload.into())
}
pub async fn respond<TProtocol>(
&self,
request_id: MsgId,
rsp: &TProtocol::Rsp,
) -> BusResult<()>
where
TProtocol: RequestProtocol,
{
let payload = crate::transport::cbor_codec::ser(rsp)?;
self._respond(request_id, RspMsgStatus::Ok, payload).await
}
pub async fn subscribe<TProtocol>(
&self,
topic: &str,
) -> BusResult<Subscription<(String, TProtocol)>>
where
TProtocol: PublishProtocol,
{
let (callback_pub_sender, callback_pub_receiver) =
unbounded_channel::<(String, TProtocol)>();
let subscription_into = self
.subscribe_into::<TProtocol>(topic, callback_pub_sender)
.await?;
let subscription = Subscription::new(callback_pub_receiver, subscription_into);
Ok(subscription)
}
pub async fn subscribe_into<TProtocol>(
&self,
topic: &str,
callback_sender: UnboundedSender<(String, TProtocol)>,
) -> BusResult<SubscriptionInto>
where
TProtocol: PublishProtocol,
{
let topic = prefix_topic(TProtocol::prefix(), topic);
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let (callback_pub_sender, mut callback_pub_receiver) = unbounded_channel::<PubMsg>();
tokio::spawn(async move {
while let Some(msg_pub) = callback_pub_receiver.recv().await {
let data: Vec<u8> = msg_pub.payload.into();
let msg: TProtocol = crate::transport::cbor_codec::deser(&data)
.map_err(|e| {
BusError::MalformedMessage(TProtocol::prefix().to_string(), e.to_string())
})
.unwrap();
let result = callback_sender.send((msg_pub.topic, msg));
if result.is_err() {
return;
}
}
});
let task = Task::Sub(TaskSub {
msg: SubMsg {
topic: topic.clone(),
},
callback_ack: callback_ack_sender,
callback_pub: callback_pub_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
let subscription = SubscriptionInto::new(&topic, self.task_sender.clone());
Ok(subscription)
}
pub async fn subscribe_bytes(
&self,
topic_with_prefix: &str,
) -> BusResult<Subscription<PubMsg>> {
let (sender, receiver) = unbounded_channel::<PubMsg>();
let sub_into = self.subscribe_bytes_into(topic_with_prefix, sender).await?;
let sub = Subscription::new(receiver, sub_into);
Ok(sub)
}
pub async fn subscribe_bytes_into(
&self,
topic_with_prefix: &str,
sender: UnboundedSender<PubMsg>,
) -> BusResult<SubscriptionInto> {
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let task = Task::Sub(TaskSub {
msg: SubMsg {
topic: topic_with_prefix.into(),
},
callback_ack: callback_ack_sender,
callback_pub: sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
let subscription = SubscriptionInto::new(topic_with_prefix, self.task_sender.clone());
Ok(subscription)
}
pub async fn publish<TProtocol>(&self, topic: &str, msg: &TProtocol) -> BusResult<usize>
where
TProtocol: PublishProtocol,
{
let topic = prefix_topic(TProtocol::prefix(), topic);
let payload = crate::transport::cbor_codec::ser(msg)?;
let num_recipients = self.publish_bytes(&topic, payload).await?;
Ok(num_recipients)
}
pub async fn publish_bytes(
&self,
topic_with_prefix: &str,
payload: Vec<u8>,
) -> BusResult<usize> {
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let task = Task::Pub(TaskPub {
msg: PubMsg {
topic: topic_with_prefix.to_string(),
payload: payload.into(),
},
callback_ack: callback_ack_sender,
});
self.task_sender.send(task)?;
let num_recipients = expect_ack(&mut callback_ack_receiver).await?.unwrap();
Ok(num_recipients)
}
async fn _respond(
&self,
req_id: MsgId,
status: RspMsgStatus,
payload: Vec<u8>,
) -> BusResult<()> {
let (callback_ack_sender, mut callback_ack_receiver) = tokio::sync::oneshot::channel();
let task = Task::Rsp(TaskRsp {
msg: RspMsg {
req_id,
status,
payload: payload.into(),
},
callback_ack: callback_ack_sender,
});
self.task_sender.send(task)?;
expect_ack(&mut callback_ack_receiver).await?;
Ok(())
}
}
async fn expect_ack(
receiver: &mut tokio::sync::oneshot::Receiver<AckMsg>,
) -> BusResult<Option<usize>> {
match timeout(Duration::from_secs(ACK_TIMEOUT_S), receiver).await {
Ok(ack) => match ack {
Ok(ack) => match ack.err {
Some(err) => Err(BusError::RequestFailed(err.to_string())),
None => Ok(ack.num_recipients),
},
_ => Err(BusError::RequestFailedChannelClosed),
},
_ => Err(BusError::RequestFailedTimeout),
}
}