use futures_util::StreamExt;
use service_channel::mpsc::{unbounded, UnboundedSender};
use service_channel::oneshot;
use std::future::Future;
use crate::{
address::Address,
envelop::{EnvelopWithMessage, Envelope},
Error, Handler, Message, Result, Service,
};
use crate::{RoutedTopic, Topic};
pub struct ServiceAddress<S> {
pub(crate) sender: UnboundedSender<Envelope<S>>,
}
impl<S> Clone for ServiceAddress<S> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<S> ServiceAddress<S> {
pub fn is_stop(&self) -> bool {
self.sender.is_closed()
}
pub fn close_service(&self) {
self.sender.close_channel()
}
}
impl<S> ServiceAddress<S>
where
S: Service,
{
pub async fn call<M>(&self, message: M) -> Result<M::Result>
where
M: Message + Send + 'static,
S: Handler<M>,
M::Result: Send,
{
let (sender, receiver) = oneshot::channel::<M::Result>();
let env = Envelope::new_with_result_channel(message, Some(sender));
self.sender
.unbounded_send(env)
.map_err(|_| Error::ServiceStoped)?;
receiver.await.map_err(|_| Error::ServiceStoped)
}
pub fn send<M>(&self, message: M) -> Result<()>
where
M: Message + Send + 'static,
S: Handler<M>,
M::Result: Send,
{
let env = Envelope::new(message);
self.sender
.unbounded_send(env)
.map_err(|_| Error::ServiceStoped)?;
Ok(())
}
pub async fn subscribe<T>(&self) -> Result<T::Item>
where
T: Topic + RoutedTopic<S>,
{
let (sender, receiver) = oneshot::channel::<T::Item>();
let env = Envelope::<S>::new_subscribe_topic::<T>(sender);
self.sender
.unbounded_send(env)
.map_err(|_| Error::ServiceStoped)?;
receiver.await.map_err(|_| Error::ServiceStoped)
}
pub fn into_address<M>(self) -> (Address<M>, impl Future<Output = ()> + Send)
where
M: Message + Send + 'static,
S: Handler<M> + Send,
M::Result: Send,
{
let (sender, mut receiver) = unbounded::<Box<EnvelopWithMessage<M>>>();
let service_sender = self.sender;
let address = Address { sender };
let future = async move {
while let Some(boxed_env) = receiver.next().await {
let envelope = Envelope::from_boxed(boxed_env);
if service_sender.unbounded_send(envelope).is_err() {
break;
}
}
};
(address, future)
}
}