tokio-core 0.1.16

Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for the rest of the tokio crates.
Documentation
//! A "tiny database" and accompanying protocol
//!
//! This example shows the usage of shared state amongst all connected clients,
//! namely a database of key/value pairs. Each connected client can send a
//! series of GET/SET commands to query the current value of a key or set the
//! value of a key.
//!
//! This example has a simple protocol you can use to interact with the server.
//! To run, first run this in one terminal window:
//!
//!     cargo run --example tinydb
//!
//! and next in another windows run:
//!
//!     cargo run --example connect 127.0.0.1:8080
//!
//! In the `connect` window you can type in commands where when you hit enter
//! you'll get a response from the server for that command. An example session
//! is:
//!
//!
//!     $ cargo run --example connect 127.0.0.1:8080
//!     GET foo
//!     foo = bar
//!     GET FOOBAR
//!     error: no key FOOBAR
//!     SET FOOBAR my awesome string
//!     set FOOBAR = `my awesome string`, previous: None
//!     SET foo tokio
//!     set foo = `tokio`, previous: Some("bar")
//!     GET foo
//!     foo = tokio
//!
//! Namely you can issue two forms of commands:
//!
//! * `GET $key` - this will fetch the value of `$key` from the database and
//!   return it. The server's database is initially populated with the key `foo`
//!   set to the value `bar`
//! * `SET $key $value` - this will set the value of `$key` to `$value`,
//!   returning the previous value, if any.

extern crate futures;
extern crate tokio_core;
extern crate tokio_io;

use std::cell::RefCell;
use std::collections::HashMap;
use std::io::BufReader;
use std::rc::Rc;
use std::env;
use std::net::SocketAddr;

use futures::prelude::*;
use tokio_core::net::TcpListener;
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io::io::{lines, write_all};

/// The in-memory database shared amongst all clients.
///
/// This database will be shared via `Rc`, so to mutate the internal map we're
/// also going to use a `RefCell` for interior mutability.
struct Database {
    map: RefCell<HashMap<String, String>>,
}

/// Possible requests our clients can send us
enum Request {
    Get { key: String },
    Set { key: String, value: String },
}

/// Responses to the `Request` commands above
enum Response {
    Value { key: String, value: String },
    Set { key: String, value: String, previous: Option<String> },
    Error { msg: String },
}

fn main() {
    // Parse the address we're going to run this server on, create a `Core`, and
    // set up our TCP listener to accept connections.
    let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
    let addr = addr.parse::<SocketAddr>().unwrap();
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let listener = TcpListener::bind(&addr, &handle).expect("failed to bind");
    println!("Listening on: {}", addr);

    // Create the shared state of this server that will be shared amongst all
    // clients. We populate the initial database and then create the `Database`
    // structure. Note the usage of `Rc` here which will be used to ensure that
    // each independently spawned client will have a reference to the in-memory
    // database.
    let mut initial_db = HashMap::new();
    initial_db.insert("foo".to_string(), "bar".to_string());
    let db = Rc::new(Database {
        map: RefCell::new(initial_db),
    });

    let done = listener.incoming().for_each(move |(socket, _addr)| {
        // As with many other small examples, the first thing we'll do is
        // *split* this TCP stream into two separately owned halves. This'll
        // allow us to work with the read and write halves independently.
        let (reader, writer) = socket.split();

        // Since our protocol is line-based we use `tokio_io`'s `lines` utility
        // to convert our stream of bytes, `reader`, into a `Stream` of lines.
        let lines = lines(BufReader::new(reader));

        // Here's where the meat of the processing in this server happens. First
        // we see a clone of the database being created, which is creating a
        // new reference for this connected client to use. Also note the `move`
        // keyword on the closure here which moves ownership of the reference
        // into the closure, which we'll need for spawning the client below.
        //
        // The `map` function here means that we'll run some code for all
        // requests (lines) we receive from the client. The actual handling here
        // is pretty simple, first we parse the request and if it's valid we
        // generate a response based on the values in the database.
        let db = db.clone();
        let responses = lines.map(move |line| {
            let request = match Request::parse(&line) {
                Ok(req) => req,
                Err(e) => return Response::Error { msg: e },
            };

            let mut db = db.map.borrow_mut();
            match request {
                Request::Get { key } => {
                    match db.get(&key) {
                        Some(value) => Response::Value { key, value: value.clone() },
                        None => Response::Error { msg: format!("no key {}", key) },
                    }
                }
                Request::Set { key, value } => {
                    let previous = db.insert(key.clone(), value.clone());
                    Response::Set { key, value, previous }
                }
            }
        });

        // At this point `responses` is a stream of `Response` types which we
        // now want to write back out to the client. To do that we use
        // `Stream::fold` to perform a loop here, serializing each response and
        // then writing it out to the client.
        let writes = responses.fold(writer, |writer, response| {
            let mut response = response.serialize();
            response.push('\n');
            write_all(writer, response.into_bytes()).map(|(w, _)| w)
        });

        // Like with other small servers, we'll `spawn` this client to ensure it
        // runs concurrently with all other clients, for now ignoring any errors
        // that we see.
        let msg = writes.then(move |_| Ok(()));
        handle.spawn(msg);
        Ok(())
    });

    core.run(done).unwrap();
}

impl Request {
    fn parse(input: &str) -> Result<Request, String> {
        let mut parts = input.splitn(3, " ");
        match parts.next() {
            Some("GET") => {
                let key = match parts.next() {
                    Some(key) => key,
                    None => return Err(format!("GET must be followed by a key")),
                };
                if parts.next().is_some() {
                    return Err(format!("GET's key must not be followed by anything"))
                }
                Ok(Request::Get { key: key.to_string() })
            }
            Some("SET") => {
                let key = match parts.next() {
                    Some(key) => key,
                    None => return Err(format!("SET must be followed by a key")),
                };
                let value = match parts.next() {
                    Some(value) => value,
                    None => return Err(format!("SET needs a value")),
                };
                Ok(Request::Set { key: key.to_string(), value: value.to_string() })
            }
            Some(cmd) => Err(format!("unknown command: {}", cmd)),
            None => Err(format!("empty input")),
        }
    }
}

impl Response {
    fn serialize(&self) -> String {
        match *self {
            Response::Value { ref key, ref value } => {
                format!("{} = {}", key, value)
            }
            Response::Set { ref key, ref value, ref previous } => {
                format!("set {} = `{}`, previous: {:?}", key, value, previous)
            }
            Response::Error { ref msg } => {
                format!("error: {}", msg)
            }
        }
    }
}