use futures::{FutureExt, Stream, StreamExt};
use futures_timeout::{Timeout, TimeoutExt};
use std::future::Future;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
pub struct InnerMap<K, S> {
key: K,
inner: Option<S>,
wake_on_success: bool,
}
impl<K, S> InnerMap<K, S> {
pub fn new(key: K, inner: S) -> Self {
Self {
key,
inner: Some(inner),
wake_on_success: false,
}
}
pub fn set_wake_on_success(&mut self, wake_on_success: bool) -> bool {
let prev = self.wake_on_success;
self.wake_on_success = wake_on_success;
wake_on_success != prev
}
pub fn key(&self) -> &K {
&self.key
}
pub fn key_value(&self) -> Option<(&K, &S)> {
let Self { key, inner, .. } = self;
inner.as_ref().map(|st| (key, st))
}
pub fn key_value_mut(&mut self) -> Option<(&K, &mut S)> {
let Self { ref key, inner, .. } = self;
inner.as_mut().map(|s| (key, s))
}
pub fn inner(&self) -> Option<&S> {
self.inner.as_ref()
}
pub fn inner_mut(&mut self) -> Option<&mut S> {
self.inner.as_mut()
}
pub fn take_inner(&mut self) -> Option<S> {
self.inner.take()
}
}
impl<K, S> InnerMap<K, S>
where
K: Unpin + Clone,
S: Unpin,
{
pub fn key_value_pin(&mut self) -> Option<(&K, Pin<&mut S>)> {
let Self { ref key, inner, .. } = self;
inner.as_mut().map(|s| (key, Pin::new(s)))
}
pub fn inner_pin(&mut self) -> Option<Pin<&mut S>> {
self.inner_mut().map(Pin::new)
}
}
impl<K, S> Future for InnerMap<K, S>
where
K: Clone + Unpin,
S: Future + Unpin,
{
type Output = (K, Option<S::Output>);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;
let Some(st) = this.inner_pin() else {
return Poll::Ready((this.key.clone(), None));
};
let output = futures::ready!(st.poll(cx));
this.inner.take();
Poll::Ready((this.key.clone(), Some(output)))
}
}
impl<K, S> Stream for InnerMap<K, S>
where
K: Clone + Unpin,
S: Stream + Unpin,
{
type Item = (K, Option<S::Item>);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self;
let Some(st) = this.inner_pin() else {
return Poll::Ready(None);
};
match st.poll_next(cx) {
Poll::Ready(Some(value)) => {
if this.wake_on_success {
cx.waker().wake_by_ref();
}
Poll::Ready(Some((this.key.clone(), Some(value))))
}
Poll::Ready(None) => {
this.inner.take();
Poll::Ready(Some((this.key.clone(), None)))
}
Poll::Pending => Poll::Pending,
}
}
}
pub struct Timed<F>(Timeout<F>);
impl<F> Deref for Timed<F> {
type Target = F;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl<F> DerefMut for Timed<F> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.0.deref_mut()
}
}
impl<F> Timed<F> {
pub(crate) fn new(item: F, timeout: Duration) -> Self {
Self(item.timeout(timeout))
}
}
impl<F> Future for Timed<F>
where
F: Future + Unpin,
{
type Output = std::io::Result<F::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
impl<F> Stream for Timed<F>
where
F: Stream + Unpin,
{
type Item = std::io::Result<F::Item>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}