use futures_util::{
stream::{empty, select, Empty, Select},
Stream, StreamExt,
};
use service_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use std::future::Future;
use crate::{Envelope, Error, Result, RoutedTopic, Service, ServiceAddress, Topic};
pub struct Context<S, T>
where
T: Stream<Item = Envelope<S>> + Unpin,
{
sender: UnboundedSender<Envelope<S>>,
receiver: Select<UnboundedReceiver<Envelope<S>>, T>,
}
impl<S> Default for Context<S, Empty<Envelope<S>>> {
fn default() -> Self {
Self::new()
}
}
impl<S> Context<S, Empty<Envelope<S>>> {
pub fn new() -> Self {
Self::with_stream(empty())
}
}
impl<S, T> Context<S, T>
where
T: Stream<Item = Envelope<S>> + Unpin,
{
pub fn with_stream(stream: T) -> Self {
let (sender, receiver) = unbounded();
Self {
sender,
receiver: select(receiver, stream),
}
}
pub fn addr(&self) -> ServiceAddress<S> {
ServiceAddress {
sender: self.sender.clone(),
}
}
pub fn publish<TopicT>(&self, topic: TopicT, item: TopicT::Item) -> Result<()>
where
TopicT: Topic + RoutedTopic<S>,
S: Service,
{
let env = Envelope::<S>::new_publish_topic::<TopicT>(topic, item);
self.sender
.unbounded_send(env)
.map_err(|_| Error::ServiceStoped)?;
Ok(())
}
pub fn stop(&mut self) {
self.sender.close_channel()
}
pub fn stream(&mut self) -> &mut T {
let (_, stream) = self.receiver.get_mut();
stream
}
}
impl<S, T> Context<S, T>
where
S: Service<Stream = T> + Send + Sized,
T: Stream<Item = Envelope<S>> + Unpin + Send,
{
pub fn run(self, service: S) -> (ServiceAddress<S>, impl Future<Output = ()> + Send) {
let mut this = self;
let address = this.addr();
let mut service = service;
let future = async move {
service.started(&mut this).await;
while let Some(e) = this.receiver.next().await {
e.handle(&mut service, &mut this).await;
}
service.stopped(&mut this).await;
};
(address, future)
}
}