tk-bufstream 0.3.0

A buffered stream backed by contiguous buffers (netbuf) for tokio
Documentation
extern crate futures;
extern crate tokio_core;
extern crate tk_bufstream;

use std::io;
use std::str;
use std::io::Write;
use std::net::SocketAddr;
use std::env;

use futures::Future;
use futures::stream::Stream;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tk_bufstream::{Buf, IoBuf, Encode, Decode};

const MAX_CONNECTIONS: usize = 200;

struct Line(String);

struct Codec;

impl Encode for Codec {
    type Item = Line;
    fn encode(&mut self, data: Line, buf: &mut Buf) {
        writeln!(buf, "{}", &data.0).unwrap();
    }
}

impl Decode for Codec {
    type Item = Line;
    fn decode(&mut self, buf: &mut Buf) -> Result<Option<Line>, io::Error> {
        if let Some(end) = buf[..].iter().position(|&x| x == b'\n') {
            let s = str::from_utf8(&buf[..end])
                .map(|v| String::from(v))
                .map_err(|_| io::Error::new(io::ErrorKind::Other,
                                            "can't decode utf-8"))?;
            buf.consume(end+1);
            Ok(Some(Line(s)))
        } else {
            Ok(None)
        }
    }
}

fn main() {
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:7777".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();

    let mut lp = Core::new().unwrap();
    let handle = lp.handle();
    let socket = TcpListener::bind(&addr, &handle).unwrap();
    println!("Listening on: {}", addr);

    let done = socket.incoming()
        .map_err(|e| { println!("Accept error: {}", e); })
        .map(|(socket, _addr)| {
            let (sink, stream) = IoBuf::new(socket)
                .framed(Codec).split();
            stream.forward(sink)
                .map(|_| ())
                .map_err(|e| { println!("Connection error: {}", e); })
        }).buffer_unordered(MAX_CONNECTIONS)
          .for_each(|()| Ok(()));
    lp.run(done).unwrap();
}