1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
};

use pin_project_lite::pin_project;

use crate::{
    actor::Actor,
    clock::{sleep, Instant, Sleep},
    fut::ActorStream,
};

pin_project! {
    /// Stream for the [`timeout`](super::ActorStreamExt::timeout) method.
    #[derive(Debug)]
    #[must_use = "streams do nothing unless polled"]
    pub struct Timeout<S> {
        #[pin]
        stream: S,
        dur: Duration,
        reset_timeout: bool,
        #[pin]
        timeout: Sleep,
    }
}

impl<S> Timeout<S> {
    pub(super) fn new(stream: S, timeout: Duration) -> Self {
        Self {
            stream,
            dur: timeout,
            reset_timeout: false,
            timeout: sleep(timeout),
        }
    }
}

impl<S, A> ActorStream<A> for Timeout<S>
where
    S: ActorStream<A>,
    A: Actor,
{
    type Item = Result<S::Item, ()>;

    fn poll_next(
        self: Pin<&mut Self>,
        act: &mut A,
        ctx: &mut A::Context,
        task: &mut Context<'_>,
    ) -> Poll<Option<Result<S::Item, ()>>> {
        let mut this = self.project();

        match this.stream.poll_next(act, ctx, task) {
            Poll::Ready(Some(res)) => {
                *this.reset_timeout = true;
                Poll::Ready(Some(Ok(res)))
            }
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => {
                // only reset timeout when poll_next returns Ready and followed by Pending after.
                if *this.reset_timeout {
                    *this.reset_timeout = false;
                    this.timeout.as_mut().reset(Instant::now() + *this.dur);
                }

                // check timeout
                this.timeout.poll(task).map(|_| Some(Err(())))
            }
        }
    }
}