1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
use fallible_iterator::FallibleIterator; use futures::sync::mpsc; use futures::{try_ready, Async, Poll, Stream}; use postgres_protocol::message::backend; use crate::proto::codec::BackendMessages; use crate::Error; pub fn channel() -> (mpsc::Sender<BackendMessages>, Responses) { let (sender, receiver) = mpsc::channel(1); ( sender, Responses { receiver, cur: BackendMessages::empty(), }, ) } pub struct Responses { receiver: mpsc::Receiver<BackendMessages>, cur: BackendMessages, } impl Stream for Responses { type Item = backend::Message; type Error = Error; fn poll(&mut self) -> Poll<Option<backend::Message>, Error> { loop { if let Some(message) = self.cur.next().map_err(Error::parse)? { return Ok(Async::Ready(Some(message))); } match try_ready!(self.receiver.poll().map_err(|()| Error::closed())) { Some(messages) => self.cur = messages, None => return Ok(Async::Ready(None)), } } } }