1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use super::SessionHandler;
use super::SessionState;
use crate::common::*;
use crate::model::io::*;

#[pin_project(project=SessionProj)]
pub struct StatefulSession<I, H: SessionHandler> {
    #[pin]
    input: I,
    handler: H,
    state: Option<State<H::Data>>,
}

type State<T> = SessionState<T>;

impl<I, H> Stream for StatefulSession<I, H>
where
    I: Stream<Item = Result<ReadControl>>,
    H: SessionHandler,
{
    type Item = Result<WriteControl>;
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        trace!("polling next");
        loop {
            ready!(self.as_mut().poll_pending(cx))?;

            if let Some(answer) = self.as_mut().pop_answer() {
                break Poll::Ready(Some(Ok(answer)));
            } else {
                ready!(self.as_mut().poll_input(cx))?;
            }
        }
    }
}

impl<I, H: SessionHandler> StatefulSession<I, H> {
    pub fn new(input: I, handler: H) -> Self {
        Self {
            state: Some(State::Ready(H::Data::default())),
            handler,
            input,
        }
    }
    fn poll_pending(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        trace!("polling pending");
        let SessionProj { state, .. } = self.project();
        match state.as_mut().expect("state must be set") {
            State::Ready(_) => (),
            State::Pending(ref mut fut) => {
                *state = Some(State::Ready(ready!(fut.as_mut().poll(cx))))
            }
        }
        Poll::Ready(Ok(()))
    }
    fn pop_answer(self: Pin<&mut Self>) -> Option<WriteControl> {
        trace!("popping answer");
        let SessionProj { handler, state, .. } = self.project();
        match state.as_mut().expect("state must be set") {
            State::Pending(_) => None,
            State::Ready(ref mut data) => handler.pop(data),
        }
    }
}
impl<I, H> StatefulSession<I, H>
where
    I: Stream<Item = Result<ReadControl>>,
    H: SessionHandler,
{
    fn poll_input(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        trace!("polling input");
        let SessionProj {
            input,
            state,
            handler,
        } = self.project();
        match state.take().expect("state must be set") {
            State::Pending(s) => {
                *state = Some(State::Pending(s));
                Poll::Ready(Ok(()))
            }
            State::Ready(data) => match input.poll_next(cx)? {
                Poll::Ready(None) => {
                    *state = Some(handler.handle(data, ReadControl::PeerShutdown));
                    Poll::Ready(Ok(()))
                }
                Poll::Ready(Some(control)) => {
                    *state = Some(handler.handle(data, control));
                    Poll::Ready(Ok(()))
                }
                Poll::Pending => {
                    *state = Some(State::Ready(data));
                    Poll::Pending
                }
            },
        }
    }
}