use crate::{
envelope::StreamEnvelope,
link::Link,
msg::StreamHandler,
service::{Service, ServiceContext},
};
use futures_core::stream::Stream;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
impl<S> ServiceContext<S>
where
S: Service,
{
pub fn attach_stream<St>(&self, stream: St, stop: bool)
where
S: StreamHandler<St::Item>,
St: Stream + Send + Unpin + 'static,
St::Item: Send + 'static,
{
StreamService::start(stream, self.link(), stop)
}
}
struct StreamService<S, St> {
stream: St,
link: Link<S>,
stop: bool,
}
impl<S, St> StreamService<S, St>
where
S: Service + StreamHandler<St::Item>,
St: Stream + Send + Unpin + 'static,
St::Item: Send + 'static,
{
pub(crate) fn start(stream: St, link: Link<S>, stop: bool) {
let service = StreamService { stream, link, stop };
tokio::spawn(service);
}
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl<S, St> Future for StreamService<S, St>
where
S: Service + StreamHandler<St::Item>,
St: Stream + Send + Unpin + 'static,
St::Item: Send + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
while let Some(item) = ready!(this.poll_next(cx)) {
if this.link.tx(StreamEnvelope::new(item)).is_err() {
return Poll::Ready(());
}
}
if this.stop {
this.link.stop();
}
Poll::Ready(())
}
}