use std::{future, task::Poll};
use futures::{FutureExt, future::BoxFuture, task::Context};
use tower::Service;
use super::PipelineError;
#[derive(Clone)]
pub struct SinkService<TSink>(TSink);
impl<TSink> SinkService<TSink> {
pub fn new(sink: TSink) -> Self {
SinkService(sink)
}
}
impl<T> Service<T> for SinkService<tokio::sync::mpsc::Sender<T>>
where T: Send + 'static
{
type Error = PipelineError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Response = ();
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, item: T) -> Self::Future {
let sink = self.0.clone();
async move {
sink.send(item)
.await
.map_err(|_| anyhow::anyhow!("sink closed in sink service"))
}
.boxed()
}
}
impl<T> Service<T> for SinkService<tokio::sync::mpsc::UnboundedSender<T>>
where T: Send + 'static
{
type Error = PipelineError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
type Response = ();
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, item: T) -> Self::Future {
let sink = self.0.clone();
let result = sink
.send(item)
.map_err(|_| anyhow::anyhow!("sink closed in sink service"));
future::ready(result)
}
}