1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
use message::{self, Message, Op, Code, Payload};
use tokio_core::reactor::Core;
use std::error::Error;
use futures::sync::oneshot::Sender;
use futures_cpupool::CpuPool;
use futures::future;
use std::io;
use error;
use lru_cache::LruCache;
use deque::{self, Worker, Stealer, Stolen};


type Store = LruCache<Vec<u8>, Payload>;

type Work = (Sender<Message>, Message);

/// A thread safe wrapper around `LruCache` that synchronizes reads/writes via a single
/// threaded worker that reads requests from a dequeue and pushes responses into a channel
/// provided by the request (`Work`) payload.
pub struct Cache {
    pool: CpuPool,
    core: Core,
    stealer: Stealer<Work>,
    worker: Worker<Work>,
}

impl Cache {
    /// Initialize a new `Cache` with `capacity` and start the worker thread.
    pub fn new(capacity: usize) -> Result<Self, io::Error> {
        let (worker, stealer) = deque::new();
        let cache = Cache {
            pool: CpuPool::new_num_cpus(),
            core: Core::new()?,
            worker: worker,
            stealer: stealer,
        };

        cache.start(capacity);
        Ok(cache)
    }

    /// Start the stealer thread, which has unsynchronized access to the underlying store.
    /// `Work` is pushed to the worker via the deque. `Work` is a (Sender<Message>, Message) pair
    /// where `Message` is a request to do work on the store and `Sender` is a channel to send the result.
    ///
    /// TODO: using `loop_fn` doesn't do what I thought, and this thread currently pegs the CPU just waiting for work.
    /// I think I need to make the work queue a pollable stream so that we can wait for new work without pegging the CPU.
    pub fn start(&self, capacity: usize) {
        let stealer = self.stealer.clone();
        // Loop infinitely, attempting to steal work from the deque.
        // When work is obtained, it's dispatched to the `handle` method, which returns a Result containing
        // the `Message::Response` variant. The response will be returned via the `Sender`
        let work = future::loop_fn(
            (stealer, LruCache::new(capacity)),
            |(stealer, mut store): (Stealer<Work>, Store)| {
                match stealer.steal() {
                    Stolen::Empty => (), // Continue
                    Stolen::Abort => (), // TODO: Handle aborts, the obvious manner of doing this doesn't seem to be working
                    Stolen::Data(work) => {
                        let (snd, msg) = work;
                        let success = match handle(&mut store, msg) {
                            Ok(msg) => snd.send(msg),
                            Err(e) => snd.send(handle_error(&e)),
                        };
                        match success {
                            Ok(_) => (),
                            Err(e) => println!("Failed to send: {}.", e),
                        }
                    }
                };
                future::ok(future::Loop::Continue((stealer, store)))
            },
        );
        self.core.handle().spawn(self.pool.spawn(work));
    }

    /// Push work onto the queue. `snd` is a `futures::sync::oneshot::Sender<Message>`. When the
    /// worker has completed the request, it will send its `Message::Response` via the sender.
    pub fn process(&self, message: Message, snd: Sender<Message>) {
        self.worker.push((snd, message));
    }
}

/// Handle the request. `Message` is a `Message::Request` variant from the front end.
/// The response message should be a `Message::Response` variant.
fn handle(store: &mut Store, message: Message) -> Result<Message, error::Error> {
    let op = message.op();
    let (key, payload) = message.consume_request()?;

    let response = match op {
        Op::Set => {
            let key = key;
            let payload = payload.ok_or_else(|| "no payload given to set op")?;
            store.insert(key, payload);
            message::response(Op::Set, Code::Ok, None)
        }

        Op::Get => {
            if let Some(ref mut payload) = store.get_mut(key.as_slice()) {
                message::response(Op::Get, Code::Hit, Some(payload.clone()))
            } else {
                message::response(Op::Get, Code::Miss, None)
            }
        }

        // TODO
        Op::Del => {
            message::response(Op::Del, Code::Ok, None)
        }
        Op::Stats => {
            message::response(
                Op::Stats,
                Code::Ok,
                Some(message::payload(store.len() as u32, vec![])),
            )
        }
    };

    Ok(response)
}

/// Creates a `Message::Response`, setting the error code and
/// and passing the error description as the payload. Responses with an error code should
/// enforce the invariant that the payload contain a UTF8-encoded string, so that clients
/// can safely decode the payload for human consumption.
///
/// TODO: match over the error kind and translate it into an appropriate error for the front end.
fn handle_error(err: &error::Error) -> Message {
    message::response(
        Op::Get,
        Code::Error,
        Some(message::payload(
            0,
            err.description().to_owned().into_bytes(),
        )),
    )
}