chunks 0.1.1

Parallel-ish chunk loading that might make things slower.
Documentation
#![deny(missing_docs)]

//! Store and retrieve chunks, on another thread or something.

#[macro_use]
extern crate log;
extern crate crossbeam_channel as chan;

use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::{io, mem, thread};

mod chunk;
pub use chunk::Chunk;

mod store;
pub use store::Store;

// State Transitions
// =================
// - load:     None      -> Loading
// - loaded:   Loading   -> Loaded
// - reload:   None      -> Loading
// - reload:   Loaded    -> Loading
// - reap:     Loaded    -> Unloading (good)
// - reap:     Loaded    -> None      (bad)
// - unloaded: Unloading -> Loading (reload)
// - unloaded: Unloading -> Loaded  (error)
// - unloaded: Unloading -> None    (normal)

/// It keeps your chunks warm for you.
pub struct Chunks<S: Store> {
    chunks: HashMap<S::Key, State<S>>,
    req: chan::Sender<Req<S>>,
    res: chan::Receiver<Res<S>>,
}

impl<S: Store> Chunks<S> {
    /// Make one of these things.
    pub fn new(mut store: S) -> Self {
        let (reqt, reqr) = chan::unbounded();
        let (rest, resr) = chan::unbounded();

        thread::spawn(move || {
            while let Some(msg) = reqr.recv() {
                let res = match msg {
                    Req::Load(key) => Res::Load(key, store.load(key)),
                    Req::Unload(key, value) => {
                        let res = store.store(key, &value);
                        Res::Unload(key, value, res)
                    }
                    Req::Save(key, value) => {
                        if let Err(e) = store.store(key, &value) {
                            error!("Couldn't save chunk {:?} ({}).", key, e);
                        }

                        continue;
                    }
                };

                rest.send(res);
            }
        });

        Chunks {
            chunks: HashMap::new(),
            req: reqt,
            res: resr,
        }
    }

    /// Process any completed I/O requests.
    pub fn tick(&mut self) {
        while let Some(res) = self.res.try_recv() {
            self.handle_response(res);
        }
    }

    /// Evict any chunks that weren't used since the last reap.
    pub fn reap(&mut self) {
        let Self { chunks, req, .. } = self;

        chunks.retain(|key, state| {
            let null = State::Loaded {
                used: false,
                chunk: Chunk::Bad,
            };

            *state = match mem::replace(state, null) {
                State::Loaded { used: true, chunk } => State::Loaded { used: false, chunk },
                State::Loaded { used: false, chunk } => match chunk {
                    Chunk::Good { save: true, value } => {
                        req.send(Req::Unload(*key, value));
                        State::Unloading { reload: false }
                    }
                    _ => return false,
                },
                other => other,
            };

            true
        });
    }

    /// Get the chunk or begin loading it.
    pub fn load(&mut self, key: S::Key) -> Option<&mut Chunk<S>> {
        match self.chunks.entry(key) {
            Occupied(entry) => match entry.into_mut() {
                State::Loaded { used, chunk } => {
                    *used = true;
                    Some(chunk)
                }
                State::Loading => None,
                State::Unloading { reload } => {
                    *reload = true;
                    None
                }
            },
            Vacant(entry) => {
                self.req.send(Req::Load(key));
                entry.insert(State::Loading);
                None
            }
        }
    }

    /// Get the chunk or wait until it's been loaded.
    pub fn load_sync(&mut self, key: S::Key) -> &mut Chunk<S> {
        //TODO(quadrupleslap): Remove unsafe when NLL lands.
        if let Some(entry) = (unsafe { &mut *(self as *mut Self) }).load(key) {
            return entry;
        }

        loop {
            let res = self.res.recv().unwrap();

            let done = match res {
                Res::Load(p, _) => p == key,
                _ => false,
            };

            self.handle_response(res);

            if done {
                break;
            }
        }

        match self.chunks.get_mut(&key) {
            Some(State::Loaded { chunk, .. }) => chunk,
            _ => unreachable!(),
        }
    }

    /// Reload the chunk, discarding any changes.
    pub fn reload(&mut self, key: S::Key) {
        match self.chunks.entry(key) {
            Occupied(entry) => match entry.into_mut() {
                entry @ State::Loaded { .. } => {
                    self.req.send(Req::Load(key));
                    *entry = State::Loading;
                }
                State::Loading => {}
                State::Unloading { reload } => {
                    *reload = true;
                }
            },
            Vacant(entry) => {
                self.req.send(Req::Load(key));
                entry.insert(State::Loading);
            }
        }
    }

    fn handle_response(&mut self, res: Res<S>) {
        match res {
            Res::Load(key, value) => {
                debug_assert!(self.chunks.get(&key).unwrap().is_loading());

                let chunk = match value {
                    Ok(value) => Chunk::Good { save: false, value },
                    Err(e) => {
                        error!("Couldn't load chunk {:?} ({}).", key, e);
                        Chunk::Bad
                    }
                };

                self.chunks.insert(key, State::Loaded { used: true, chunk });
            }
            Res::Unload(key, value, result) => {
                debug_assert!(self.chunks.get(&key).unwrap().is_unloading());

                let reload = match self.chunks.get(&key) {
                    Some(&State::Unloading { reload }) => reload,
                    _ => false,
                };

                let error = if let Err(e) = result {
                    error!("Couldn't unload chunk {:?} ({}).", key, e);
                    true
                } else {
                    false
                };

                if reload {
                    self.req.send(Req::Load(key));
                    self.chunks.insert(key, State::Loading);
                } else if error {
                    self.chunks.insert(
                        key,
                        State::Loaded {
                            used: false,
                            chunk: Chunk::Good { save: true, value },
                        },
                    );
                } else {
                    self.chunks.remove(&key);
                }
            }
        }
    }
}

impl<S: Store> Chunks<S>
where
    S::Value: Clone,
{
    /// Save any unsaved chunks.
    pub fn save(&mut self) {
        for (key, state) in self.chunks.iter_mut() {
            if let State::Loaded {
                chunk: Chunk::Good { save, value },
                ..
            } = state
            {
                if *save {
                    self.req.send(Req::Save(*key, value.clone()));
                    *save = false;
                }
            }
        }
    }
}

enum State<S: Store> {
    Loading,
    Unloading { reload: bool },
    Loaded { used: bool, chunk: Chunk<S> },
}

impl<S: Store> State<S> {
    fn is_loading(&self) -> bool {
        if let State::Loading = self {
            true
        } else {
            false
        }
    }

    fn is_unloading(&self) -> bool {
        if let State::Unloading { .. } = self {
            true
        } else {
            false
        }
    }
}

enum Req<S: Store> {
    Load(S::Key),
    Unload(S::Key, S::Value),
    Save(S::Key, S::Value),
}

enum Res<S: Store> {
    Load(S::Key, io::Result<S::Value>),
    Unload(S::Key, S::Value, io::Result<()>),
}