fibers 0.1.13

A Rust library to execute a number of lightweight asynchronous tasks (a.k.a, fibers) based on futures and mio
Documentation
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

extern crate clap;
extern crate fibers;
extern crate futures;
extern crate handy_async;

use clap::{App, Arg};
use fibers::{Executor, Spawn, ThreadPoolExecutor};
use futures::{Future, Stream};
use handy_async::io::{AsyncWrite, ReadFrom};
use handy_async::pattern::AllowPartial;
use std::io;

fn main() {
    let matches = App::new("tcp_echo_srv")
        .arg(
            Arg::with_name("PORT")
                .short("p")
                .takes_value(true)
                .default_value("3000"),
        )
        .get_matches();
    let port = matches.value_of("PORT").unwrap();
    let addr = format!("0.0.0.0:{}", port)
        .parse()
        .expect("Invalid TCP bind address");

    let mut executor = ThreadPoolExecutor::new().expect("Cannot create Executor");
    let handle0 = executor.handle();
    let monitor = executor.spawn_monitor(fibers::net::TcpListener::bind(addr).and_then(
        move |listener| {
            println!("# Start listening: {}: ", addr);
            listener.incoming().for_each(move |(client, addr)| {
                println!("# CONNECTED: {}", addr);
                let handle1 = handle0.clone();
                handle0.spawn(
                    client
                        .and_then(move |client| {
                            let (reader, writer) = (client.clone(), client);
                            let (tx, rx) = fibers::sync::mpsc::channel();

                            // writer
                            handle1.spawn(
                                rx.map_err(|_| -> io::Error { unreachable!() })
                                    .fold(writer, |writer, buf: Vec<u8>| {
                                        println!("# SEND: {} bytes", buf.len());
                                        writer
                                            .async_write_all(buf)
                                            .map(|(w, _)| w)
                                            .map_err(|e| e.into_error())
                                    })
                                    .then(|r| {
                                        println!("# Writer finished: {:?}", r);
                                        Ok(())
                                    }),
                            );

                            // reader
                            let stream = vec![0; 1024].allow_partial().into_stream(reader);
                            stream
                                .map_err(|e| e.into_error())
                                .fold(tx, |tx, (mut buf, len)| {
                                    buf.truncate(len);
                                    println!("# RECV: {} bytes", buf.len());
                                    tx.send(buf).expect("Cannot send");
                                    Ok(tx) as io::Result<_>
                                })
                        })
                        .then(|r| {
                            println!("# Client finished: {:?}", r);
                            Ok(())
                        }),
                );
                Ok(())
            })
        },
    ));
    let result = executor.run_fiber(monitor).expect("Execution failed");
    println!("# Listener finished: {:?}", result);
}