futures-io 0.1.0

An I/O abstraction layer built on top of streams of readiness. Includes many std::io primitives intended for usage with futures.
use std::io::{self, Read};
use std::cmp;

use futures::{Poll, Task};
use futures::stream::Stream;

use Ready;

pub struct Take<A> {
    a: A,
    left: u64,
}

pub fn take<A>(a: A, amt: u64) -> Take<A>
    where A: Stream<Item=Ready, Error=io::Error> + Read,
{
    Take {
        a: a,
        left: amt,
    }
}

impl<A> Stream for Take<A>
    where A: Stream<Item=Ready, Error=io::Error> + Read,
{
    type Item = Ready;
    type Error = io::Error;

    fn poll(&mut self, task: &mut Task) -> Poll<Option<Ready>, io::Error> {
        if self.left == 0 {
            Poll::Ok(None)
        } else {
            self.a.poll(task)
        }
    }

    fn schedule(&mut self, task: &mut Task) {
        if self.left == 0 {
            task.notify()
        } else {
            self.a.schedule(task)
        }
    }
}

impl<A> Read for Take<A>
    where A: Stream<Item=Ready, Error=io::Error> + Read,
{
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        if self.left == 0 {
            return Ok(0)
        }

        let amt = cmp::min(buf.len() as u64, self.left) as usize;
        let n = try!(self.a.read(&mut buf[..amt]));
        self.left -= n as u64;
        Ok(n)
    }
}