use std::{
future::Future,
ops::DerefMut,
pin::Pin,
task::{Context, Poll, ready},
};
#[cfg(feature = "compat")]
use crate::FuturesCompat;
use crate::{Serialize, future::assert_future};
pub trait IoSink: Sink<Error = std::io::Error> {}
impl<T: ?Sized> IoSink for T where T: Sink<Error = std::io::Error> {}
#[must_use = "sinks do nothing unless polled"]
pub trait Sink {
type Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error>;
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>;
}
impl<S: ?Sized + Sink + Unpin> Sink for &mut S {
type Error = S::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_ready(cx)
}
fn start_send<Item: Serialize>(
mut self: Pin<&mut Self>,
item: Item,
) -> Result<(), Self::Error> {
Pin::new(&mut **self).start_send(item)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Pin::new(&mut **self).poll_close(cx)
}
}
impl<P> Sink for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Sink,
{
type Error = <P::Target as Sink>::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_ready(cx)
}
fn start_send<Item: Serialize>(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
self.get_mut().as_mut().start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.get_mut().as_mut().poll_close(cx)
}
}
pub trait SinkExt: Sink {
fn close(&mut self) -> Close<'_, Self>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Close::new(self))
}
fn send<Item: Serialize>(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
}
fn feed<Item: Serialize>(&mut self, item: Item) -> Feed<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
}
fn flush(&mut self) -> Flush<'_, Self>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
}
#[cfg(feature = "compat")]
fn compat_sink<Item: Serialize>(self) -> FuturesCompat<Self, Item>
where
Self: Sized,
{
assert_futures_sink(FuturesCompat::new(self))
}
}
impl<S: Sink + ?Sized> SinkExt for S {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Close<'a, Si: ?Sized> {
sink: &'a mut Si,
}
impl<Si: Unpin + ?Sized> Unpin for Close<'_, Si> {}
impl<'a, Si: Sink + Unpin + ?Sized> Close<'a, Si> {
fn new(sink: &'a mut Si) -> Self {
Self { sink }
}
}
impl<Si: Sink + Unpin + ?Sized> Future for Close<'_, Si> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.sink).poll_close(cx)
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Send<'a, Si: ?Sized, Item> {
feed: Feed<'a, Si, Item>,
}
impl<Si: Unpin + ?Sized, Item> Unpin for Send<'_, Si, Item> {}
impl<'a, Si: Sink + Unpin + ?Sized, Item> Send<'a, Si, Item> {
fn new(sink: &'a mut Si, item: Item) -> Self {
Self {
feed: Feed::new(sink, item),
}
}
}
impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Send<'_, Si, Item> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
if this.feed.is_item_pending() {
ready!(Pin::new(&mut this.feed).poll(cx))?;
debug_assert!(!this.feed.is_item_pending());
}
ready!(this.feed.sink_pin_mut().poll_flush(cx))?;
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Feed<'a, Si: ?Sized, Item> {
sink: &'a mut Si,
item: Option<Item>,
}
impl<Si: Unpin + ?Sized, Item> Unpin for Feed<'_, Si, Item> {}
impl<'a, Si: Sink + Unpin + ?Sized, Item> Feed<'a, Si, Item> {
fn new(sink: &'a mut Si, item: Item) -> Self {
Feed {
sink,
item: Some(item),
}
}
fn sink_pin_mut(&mut self) -> Pin<&mut Si> {
Pin::new(self.sink)
}
fn is_item_pending(&self) -> bool {
self.item.is_some()
}
}
impl<Si: Sink + Unpin + ?Sized, Item: Serialize> Future for Feed<'_, Si, Item> {
type Output = Result<(), Si::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut sink = Pin::new(&mut this.sink);
ready!(sink.as_mut().poll_ready(cx))?;
let item = this.item.take().expect("polled Feed after completion");
sink.as_mut().start_send(item)?;
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Flush<'a, Si: ?Sized> {
sink: &'a mut Si,
}
impl<Si: Unpin + ?Sized> Unpin for Flush<'_, Si> {}
impl<'a, Si: Sink + Unpin + ?Sized> Flush<'a, Si> {
fn new(sink: &'a mut Si) -> Self {
Self { sink }
}
}
impl<Si: Sink + Unpin + ?Sized> Future for Flush<'_, Si> {
type Output = Result<(), Si::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.sink).poll_flush(cx)
}
}
pub(crate) fn assert_futures_sink<S: futures_sink::Sink<Item>, Item>(sink: S) -> S {
sink
}