async_docker 0.1.1

A Rust asynchronous interface for Docker API
Documentation
// Code written by Sam Rijs (https://github.com/srijs)
//
// Source:
// https://github.com/hyperium/hyper/issues/1335
// https://play.rust-lang.org/?gist=971e438cabd6f91efb76b7e45b15edf3&version=stable

use std::mem::replace;
use std::string::FromUtf8Error;

use futures::{Async, Poll, Stream};
use futures::stream::{Fuse};

use std::iter::Iterator;

pub(crate) struct Lines<S: Stream> {
    buffered: Option<Vec<u8>>,
    stream: Fuse<S>
}

impl<S: Stream> Lines<S> {
    pub fn new(stream: S) -> Lines<S> {
        Lines {
            buffered: None,
            stream: stream.fuse()
        }
    }

    fn process(&mut self, flush: bool) -> Option<Result<String, FromUtf8Error>> {
        let buffered = replace(&mut self.buffered, None);
        if let Some(ref buffer) = buffered {
            let mut split = buffer.splitn(2, |c| *c == b'\n');
            if let Some(first) = split.next() {
                if let Some(second) = split.next() {
                    replace(&mut self.buffered, Some(second.to_vec()));
                    return Some(String::from_utf8(first.to_vec()));
                } else if flush {
                    return Some(String::from_utf8(first.to_vec()));
                }
            }
        }
        replace(&mut self.buffered, buffered);
        None
    }
}

impl<S> Stream for Lines<S>
    where S: Stream, S::Item: AsRef<[u8]>, S::Error: From<FromUtf8Error>
{
    type Item = String;
    type Error = S::Error;

    fn poll(&mut self) -> Poll<Option<String>, S::Error> {
        match self.stream.poll()? {
            Async::NotReady => Ok(Async::NotReady),
            Async::Ready(None) => {
                match self.process(true) {
                    Some(Ok(line)) => Ok(Async::Ready(Some(line))),
                    Some(Err(err)) => Err(err.into()),
                    None => Ok(Async::Ready(None))
                }
            },
            Async::Ready(Some(chunk)) => {
                if let Some(ref mut buffer) = self.buffered {
                    buffer.extend(chunk.as_ref());
                } else {
                    self.buffered = Some(chunk.as_ref().to_vec());
                }
                match self.process(false) {
                    Some(Ok(line)) => Ok(Async::Ready(Some(line))),
                    Some(Err(err)) => Err(err.into()),
                    None => Ok(Async::NotReady)
                }
            }
        }
    }
}