use std::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::Sink;
pub struct TranslateSink<F, S, I> {
translater: F,
sink: S,
_i: PhantomData<I>,
}
impl<F, S, I> Unpin for TranslateSink<F, S, I> {}
impl<F, S, I> TranslateSink<F, S, I> {
pub fn new(sink: S, translater: F) -> Self {
Self {
translater,
sink,
_i: PhantomData,
}
}
}
impl<F, S, I> Sink<I> for TranslateSink<F, S, I>
where
F: Translate<I>,
I: Unpin,
S: Sink<F::Output> + Unpin,
{
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_ready(cx)
}
fn start_send(mut self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
match self.translater.translate(item) {
Some(translated) => Pin::new(&mut self.sink).start_send(translated),
None => Ok(()),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
pub trait Translate<I> {
type Output;
fn translate(&mut self, input: I) -> Option<Self::Output>;
}
impl<I, F, O> Translate<I> for F
where F: FnMut(I) -> Option<O>
{
type Output = O;
fn translate(&mut self, input: I) -> Option<Self::Output> {
(self)(input)
}
}
#[cfg(test)]
mod test {
use futures::{SinkExt, StreamExt};
use tokio::sync::mpsc;
use super::*;
use crate::runtime;
#[tokio::test]
async fn check_translates() {
let (tx, mut rx) = mpsc::channel(1);
let mut translate_sink = TranslateSink::new(tx, |input: u32| {
if input % 2 == 0 {
Some(format!("Even: {}", input))
} else {
None
}
});
translate_sink.send(123).await.unwrap();
translate_sink.send(124).await.unwrap();
let result = rx.next().await.unwrap();
assert_eq!(result, "Even: 124");
}
}