tokio 0.1.21

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
//! A chat server that broadcasts a message to all connections.
//!
//! This is a line-based server which accepts connections, reads lines from
//! those connections, and broadcasts the lines to all other connected clients.
//!
//! This example is similar to chat.rs, but uses combinators and a much more
//! functional style.
//!
//! Because we are here running the reactor/executor on the same thread instead
//! of a threadpool, we can avoid full synchronization with Arc + Mutex and use
//! Rc + RefCell instead. The max performance is however limited to a CPU HW
//! thread.
//!
//! You can test this out by running:
//!
//!     cargo run --example chat-combinator-current-thread
//!
//! And then in another window run:
//!
//!     cargo run --example connect 127.0.0.1:8080
//!
//! You can run the second command in multiple windows and then chat between the
//! two, seeing the messages from the other client as they're received. For all
//! connected clients they'll all join the same room and see everyone else's
//! messages.

#![deny(warnings)]

extern crate futures;
extern crate tokio;

use tokio::io;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::runtime::current_thread::{Runtime, TaskExecutor};

use std::cell::RefCell;
use std::collections::HashMap;
use std::env;
use std::io::BufReader;
use std::iter;
use std::rc::Rc;

fn main() -> Result<(), Box<std::error::Error>> {
    let mut runtime = Runtime::new().unwrap();

    // Create the TCP listener we'll accept connections on.
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse()?;

    let socket = TcpListener::bind(&addr)?;
    println!("Listening on: {}", addr);

    // This is running on the Tokio current_thread runtime, so it will be single-
    // threaded. The `Rc<RefCell<...>>` allows state to be shared across the tasks.
    let connections = Rc::new(RefCell::new(HashMap::new()));

    // The server task asynchronously iterates over and processes each incoming
    // connection.
    let srv = socket
        .incoming()
        .map_err(|e| {
            println!("failed to accept socket; error = {:?}", e);
            e
        })
        .for_each(move |stream| {
            // The client's socket address
            let addr = stream.peer_addr()?;

            println!("New Connection: {}", addr);

            // Split the TcpStream into two separate handles. One handle for reading
            // and one handle for writing. This lets us use separate tasks for
            // reading and writing.
            let (reader, writer) = stream.split();

            // Create a channel for our stream, which other sockets will use to
            // send us messages. Then register our address with the stream to send
            // data to us.
            let (tx, rx) = futures::sync::mpsc::unbounded();
            let mut conns = connections.borrow_mut();
            conns.insert(addr, tx);

            // Define here what we do for the actual I/O. That is, read a bunch of
            // lines from the socket and dispatch them while we also write any lines
            // from other sockets.
            let connections_inner = connections.clone();
            let reader = BufReader::new(reader);

            // Model the read portion of this socket by mapping an infinite
            // iterator to each line off the socket. This "loop" is then
            // terminated with an error once we hit EOF on the socket.
            let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));

            let socket_reader = iter.fold(reader, move |reader, _| {
                // Read a line off the socket, failing if we're at EOF
                let line = io::read_until(reader, b'\n', Vec::new());
                let line = line.and_then(|(reader, vec)| {
                    if vec.len() == 0 {
                        Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
                    } else {
                        Ok((reader, vec))
                    }
                });

                // Convert the bytes we read into a string, and then send that
                // string to all other connected clients.
                let line = line.map(|(reader, vec)| (reader, String::from_utf8(vec)));

                // Move the connection state into the closure below.
                let connections = connections_inner.clone();

                line.map(move |(reader, message)| {
                    println!("{}: {:?}", addr, message);
                    let mut conns = connections.borrow_mut();

                    if let Ok(msg) = message {
                        // For each open connection except the sender, send the
                        // string via the channel.
                        let iter = conns
                            .iter_mut()
                            .filter(|&(&k, _)| k != addr)
                            .map(|(_, v)| v);
                        for tx in iter {
                            tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
                        }
                    } else {
                        let tx = conns.get_mut(&addr).unwrap();
                        tx.unbounded_send("You didn't send valid UTF-8.".to_string())
                            .unwrap();
                    }

                    reader
                })
            });

            // Whenever we receive a string on the Receiver, we write it to
            // `WriteHalf<TcpStream>`.
            let socket_writer = rx.fold(writer, |writer, msg| {
                let amt = io::write_all(writer, msg.into_bytes());
                let amt = amt.map(|(writer, _)| writer);
                amt.map_err(|_| ())
            });

            // Now that we've got futures representing each half of the socket, we
            // use the `select` combinator to wait for either half to be done to
            // tear down the other. Then we spawn off the result.
            let connections = connections.clone();
            let socket_reader = socket_reader.map_err(|_| ());
            let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));

            // Spawn locally a task to process the connection
            TaskExecutor::current()
                .spawn_local(Box::new(connection.then(move |_| {
                    let mut conns = connections.borrow_mut();
                    conns.remove(&addr);
                    println!("Connection {} closed.", addr);
                    Ok(())
                })))
                .unwrap();

            Ok(())
        })
        .map_err(|err| println!("error occurred: {:?}", err));

    // Spawn srv itself
    runtime.spawn(srv);

    // Execute server
    runtime.run().unwrap();
    Ok(())
}