use crate::{
envelope::ErrorEnvelope,
link::{Link, LinkError},
msg::ErrorHandler,
service::{Service, ServiceContext},
};
use futures_sink::Sink;
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc;
impl<S: Service> ServiceContext<S> {
pub fn attach_sink<Si, I>(&self, sink: Si) -> SinkLink<I>
where
S: ErrorHandler<Si::Error>,
Si: Sink<I> + Send + Unpin + 'static,
Si::Error: Send + 'static,
I: Send + 'static,
{
SinkService::start(sink, self.link())
}
}
pub struct SinkLink<I>(mpsc::UnboundedSender<SinkMessage<I>>);
impl<I> Clone for SinkLink<I> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
enum SinkMessage<I> {
Feed { item: I, flush: bool },
Flush,
Stop,
}
impl<I> SinkLink<I>
where
I: Send,
{
pub fn sink(&self, item: I) -> Result<(), LinkError> {
self.0
.send(SinkMessage::Feed { item, flush: true })
.map_err(|_| LinkError::Send)
}
pub fn feed(&self, item: I) -> Result<(), LinkError> {
self.0
.send(SinkMessage::Feed { item, flush: false })
.map_err(|_| LinkError::Send)
}
pub fn flush(&self) -> Result<(), LinkError> {
self.0.send(SinkMessage::Flush).map_err(|_| LinkError::Send)
}
pub fn stop(&self) -> Result<(), LinkError> {
self.0.send(SinkMessage::Stop).map_err(|_| LinkError::Send)
}
}
struct SinkService<S, Si, I> {
sink: Si,
link: Link<S>,
rx: mpsc::UnboundedReceiver<SinkMessage<I>>,
action: Option<FutState<I>>,
}
impl<S, Si, I> SinkService<S, Si, I>
where
S: Service + ErrorHandler<Si::Error>,
Si: Sink<I> + Send + Unpin + 'static,
Si::Error: Send + 'static,
I: Send + 'static,
{
pub(crate) fn start(sink: Si, link: Link<S>) -> SinkLink<I> {
let (tx, rx) = mpsc::unbounded_channel();
let sink_link = SinkLink(tx);
let service = SinkService {
sink,
link,
rx,
action: None,
};
tokio::spawn(service);
sink_link
}
fn poll_sink(
mut sink: Pin<&mut Si>,
state: &mut FutState<I>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Si::Error>> {
let flush = match state {
FutState::Feed { item, flush } => {
if item.is_some() {
ready!(sink.as_mut().poll_ready(cx))?;
let item = item.take().expect("polled feed after completion");
sink.as_mut().start_send(item)?;
}
*flush
}
FutState::Flush => true,
};
if flush {
sink.poll_flush(cx)
} else {
Poll::Ready(Ok(()))
}
}
}
enum FutState<I> {
Feed {
item: Option<I>,
flush: bool,
},
Flush,
}
impl<I> Unpin for FutState<I> {}
impl<S, Si, I> Future for SinkService<S, Si, I>
where
S: Service + ErrorHandler<Si::Error>,
Si: Sink<I> + Send + Unpin + 'static,
Si::Error: Send + 'static,
I: Send + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
this.action = if let Some(action) = &mut this.action {
let sink = Pin::new(&mut this.sink);
if let Err(err) = ready!(Self::poll_sink(sink, action, cx)) {
if this.link.tx(ErrorEnvelope::new(err)).is_err() {
break;
}
}
None
} else {
let msg = match ready!(this.rx.poll_recv(cx)) {
Some(value) => value,
None => break,
};
let state = match msg {
SinkMessage::Feed { item, flush } => FutState::Feed {
item: Some(item),
flush,
},
SinkMessage::Flush => FutState::Flush,
SinkMessage::Stop => break,
};
Some(state)
}
}
Poll::Ready(())
}
}