1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
use fallible_iterator::FallibleIterator;
use futures::sync::mpsc;
use futures::{try_ready, Async, Poll, Stream};
use postgres_protocol::message::backend;

use crate::proto::codec::BackendMessages;
use crate::Error;

pub fn channel() -> (mpsc::Sender<BackendMessages>, Responses) {
    let (sender, receiver) = mpsc::channel(1);

    (
        sender,
        Responses {
            receiver,
            cur: BackendMessages::empty(),
        },
    )
}

pub struct Responses {
    receiver: mpsc::Receiver<BackendMessages>,
    cur: BackendMessages,
}

impl Stream for Responses {
    type Item = backend::Message;
    type Error = Error;

    fn poll(&mut self) -> Poll<Option<backend::Message>, Error> {
        loop {
            if let Some(message) = self.cur.next().map_err(Error::parse)? {
                return Ok(Async::Ready(Some(message)));
            }

            match try_ready!(self.receiver.poll().map_err(|()| Error::closed())) {
                Some(messages) => self.cur = messages,
                None => return Ok(Async::Ready(None)),
            }
        }
    }
}