ruyi 0.1.6

An event-driven framework for non-blocking, asynchronous I/O in Rust
Documentation
#[macro_use]
extern crate log;
extern crate env_logger;

extern crate num_cpus;
extern crate futures;
extern crate ruyi;

use std::thread;

use futures::{future, Future, Stream};
use ruyi::stream::IntoStream;
use ruyi::channel::spsc;
use ruyi::io;
use ruyi::net::{TcpStream, TcpListener};
use ruyi::reactor::{self, IntoTask};

fn echo(sock: TcpStream) {
    // Disable Nagle's algorithm
    let task = future::result(sock.set_nodelay(true))
        .and_then(|_| {
            let (r, w) = io::split(sock);
            // Send whatever received back to client
            io::copy(r, w)
        })
        .map_err(|e| error!("{}", e))
        .into_task();
    reactor::spawn(task);
}

fn main() {
    // Initialize logger
    env_logger::init().unwrap();

    // Get number of cpu cores to create same quantity of IO workers
    let n = num_cpus::get();
    let mask = n - 1;
    let mut workers = Vec::with_capacity(n);
    for _ in 0..n {
        // Create a spsc queue to send accepted sockets
        // to the corresponding IO thread
        let (tx, rx) = spsc::sync_channel(512).unwrap();

        // Start an IO worker
        thread::spawn(move || {
            reactor::run(rx.into_stream().for_each(|s| Ok(echo(s)))).unwrap()
        });
        workers.push(tx);
    }

    let mut i = 0;

    // Build a TCP acceptor
    let acceptor = TcpListener::builder()
        .port(10007)
        .build()
        .unwrap()
        .incoming()
        .for_each(|(sock, _)| {
            // Dispatch sockets to IO workers in a round-robin manner
            workers[i].try_send(sock).unwrap();
            i = (i + 1) & mask;
            Ok(())
        });

    // Run acceptor
    reactor::run(acceptor).unwrap();
}