use std::marker::PhantomPinned;
use std::{future::Future, ops::DerefMut, pin::Pin, task::Poll};
use crate::Context;
use pin_project::pin_project;
mod chain;
mod errors;
mod filter;
#[cfg(feature = "logging")]
mod sink_log;
pub use errors::*;
pub trait Sink {
type Item;
fn poll_send(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
value: Self::Item,
) -> PollSend<Self::Item>;
fn send(&mut self, value: Self::Item) -> SendFuture<Self> {
SendFuture::new(self, value)
}
fn try_send(&mut self, value: Self::Item) -> Result<(), TrySendError<Self::Item>>
where
Self: Unpin,
{
let pin = Pin::new(self);
match pin.poll_send(&mut Context::empty(), value) {
PollSend::Ready => Ok(()),
PollSend::Pending(value) => Err(TrySendError::Pending(value)),
PollSend::Rejected(value) => Err(TrySendError::Rejected(value)),
}
}
#[cfg(feature = "blocking")]
fn blocking_send(&mut self, value: Self::Item) -> Result<(), SendError<Self::Item>>
where
Self: Unpin,
{
pollster::block_on(self.send(value))
}
fn after<Before>(self, before: Before) -> chain::ChainSink<Before, Self>
where
Before: Sink<Item = Self::Item>,
Self: Sized,
{
chain::ChainSink::new(before, self)
}
fn filter<Filter>(self, filter: Filter) -> filter::FilterSink<Filter, Self>
where
Filter: FnMut(&Self::Item) -> bool,
Self: Sized,
{
filter::FilterSink::new(filter, self)
}
#[cfg(feature = "logging")]
fn log(self, level: log::Level) -> sink_log::SinkLog<Self>
where
Self: Sized,
Self::Item: std::fmt::Debug,
{
sink_log::SinkLog::new(self, level)
}
}
impl<S> Sink for &mut S
where
S: Sink + Unpin + ?Sized,
{
type Item = S::Item;
fn poll_send(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
value: Self::Item,
) -> PollSend<Self::Item> {
S::poll_send(Pin::new(&mut **self), cx, value)
}
}
impl<P, S> Sink for Pin<P>
where
P: DerefMut<Target = S> + Unpin,
S: Sink + Unpin + ?Sized,
{
type Item = <S as Sink>::Item;
fn poll_send(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
value: Self::Item,
) -> PollSend<Self::Item> {
Pin::get_mut(self).as_mut().poll_send(cx, value)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PollSend<T> {
Ready,
Pending(T),
Rejected(T),
}
#[pin_project]
#[must_use = "futures do nothing unless polled"]
pub struct SendFuture<'s, S>
where
S: Sink + ?Sized,
{
#[pin]
send: &'s mut S,
value: Option<S::Item>,
#[pin]
_pin: PhantomPinned,
}
impl<'s, S> SendFuture<'s, S>
where
S: Sink + ?Sized,
{
pub fn new(send: &'s mut S, value: S::Item) -> SendFuture<S> {
Self {
send,
value: Some(value),
_pin: PhantomPinned,
}
}
}
impl<'s, S> Future for SendFuture<'s, S>
where
S: Sink + Unpin + ?Sized,
{
type Output = Result<(), SendError<S::Item>>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
if self.value.is_none() {
return Poll::Ready(Ok(()));
}
let this = self.project();
let mut cx: crate::Context<'_> = cx.into();
match this.send.poll_send(&mut cx, this.value.take().unwrap()) {
PollSend::Ready => Poll::Ready(Ok(())),
PollSend::Pending(value) => {
*this.value = Some(value);
Poll::Pending
}
PollSend::Rejected(value) => Poll::Ready(Err(SendError(value))),
}
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "blocking")]
#[test]
fn test_blocking() {
use super::Sink;
use crate::test::sink::ready;
let mut stream = ready();
assert_eq!(Ok(()), stream.blocking_send(1usize));
}
}