chunks/
lib.rs

1#![deny(missing_docs)]
2
3//! Store and retrieve chunks, on another thread or something.
4
5#[macro_use]
6extern crate log;
7extern crate crossbeam_channel as chan;
8
9use std::collections::hash_map::Entry::{Occupied, Vacant};
10use std::collections::HashMap;
11use std::{io, mem, thread};
12
13mod chunk;
14pub use chunk::Chunk;
15
16mod store;
17pub use store::Store;
18
19// State Transitions
20// =================
21// - load:     None      -> Loading
22// - loaded:   Loading   -> Loaded
23// - reload:   None      -> Loading
24// - reload:   Loaded    -> Loading
25// - reap:     Loaded    -> Unloading (good)
26// - reap:     Loaded    -> None      (bad)
27// - unloaded: Unloading -> Loading (reload)
28// - unloaded: Unloading -> Loaded  (error)
29// - unloaded: Unloading -> None    (normal)
30
31/// It keeps your chunks warm for you.
32pub struct Chunks<S: Store> {
33    chunks: HashMap<S::Key, State<S>>,
34    req: chan::Sender<Req<S>>,
35    res: chan::Receiver<Res<S>>,
36}
37
38impl<S: Store> Chunks<S> {
39    /// Make one of these things.
40    pub fn new(mut store: S) -> Self {
41        let (reqt, reqr) = chan::unbounded();
42        let (rest, resr) = chan::unbounded();
43
44        thread::spawn(move || {
45            while let Some(msg) = reqr.recv() {
46                let res = match msg {
47                    Req::Load(key) => Res::Load(key, store.load(key)),
48                    Req::Unload(key, value) => {
49                        let res = store.store(key, &value);
50                        Res::Unload(key, value, res)
51                    }
52                    Req::Save(key, value) => {
53                        if let Err(e) = store.store(key, &value) {
54                            error!("Couldn't save chunk {:?} ({}).", key, e);
55                        }
56
57                        continue;
58                    }
59                };
60
61                rest.send(res);
62            }
63        });
64
65        Chunks {
66            chunks: HashMap::new(),
67            req: reqt,
68            res: resr,
69        }
70    }
71
72    /// Process any completed I/O requests.
73    pub fn tick(&mut self) {
74        while let Some(res) = self.res.try_recv() {
75            self.handle_response(res);
76        }
77    }
78
79    /// Evict any chunks that weren't used since the last reap.
80    pub fn reap(&mut self) {
81        let Self { chunks, req, .. } = self;
82
83        chunks.retain(|key, state| {
84            let null = State::Loaded {
85                used: false,
86                chunk: Chunk::Bad,
87            };
88
89            *state = match mem::replace(state, null) {
90                State::Loaded { used: true, chunk } => State::Loaded { used: false, chunk },
91                State::Loaded { used: false, chunk } => match chunk {
92                    Chunk::Good { save: true, value } => {
93                        req.send(Req::Unload(*key, value));
94                        State::Unloading { reload: false }
95                    }
96                    _ => return false,
97                },
98                other => other,
99            };
100
101            true
102        });
103    }
104
105    /// Get the chunk or begin loading it.
106    pub fn load(&mut self, key: S::Key) -> Option<&mut Chunk<S>> {
107        match self.chunks.entry(key) {
108            Occupied(entry) => match entry.into_mut() {
109                State::Loaded { used, chunk } => {
110                    *used = true;
111                    Some(chunk)
112                }
113                State::Loading => None,
114                State::Unloading { reload } => {
115                    *reload = true;
116                    None
117                }
118            },
119            Vacant(entry) => {
120                self.req.send(Req::Load(key));
121                entry.insert(State::Loading);
122                None
123            }
124        }
125    }
126
127    /// Get the chunk or wait until it's been loaded.
128    pub fn load_sync(&mut self, key: S::Key) -> &mut Chunk<S> {
129        //TODO(quadrupleslap): Remove unsafe when NLL lands.
130        if let Some(entry) = (unsafe { &mut *(self as *mut Self) }).load(key) {
131            return entry;
132        }
133
134        loop {
135            let res = self.res.recv().unwrap();
136
137            let done = match res {
138                Res::Load(p, _) => p == key,
139                _ => false,
140            };
141
142            self.handle_response(res);
143
144            if done {
145                break;
146            }
147        }
148
149        match self.chunks.get_mut(&key) {
150            Some(State::Loaded { chunk, .. }) => chunk,
151            _ => unreachable!(),
152        }
153    }
154
155    /// Reload the chunk, discarding any changes.
156    pub fn reload(&mut self, key: S::Key) {
157        match self.chunks.entry(key) {
158            Occupied(entry) => match entry.into_mut() {
159                entry @ State::Loaded { .. } => {
160                    self.req.send(Req::Load(key));
161                    *entry = State::Loading;
162                }
163                State::Loading => {}
164                State::Unloading { reload } => {
165                    *reload = true;
166                }
167            },
168            Vacant(entry) => {
169                self.req.send(Req::Load(key));
170                entry.insert(State::Loading);
171            }
172        }
173    }
174
175    fn handle_response(&mut self, res: Res<S>) {
176        match res {
177            Res::Load(key, value) => {
178                debug_assert!(self.chunks.get(&key).unwrap().is_loading());
179
180                let chunk = match value {
181                    Ok(value) => Chunk::Good { save: false, value },
182                    Err(e) => {
183                        error!("Couldn't load chunk {:?} ({}).", key, e);
184                        Chunk::Bad
185                    }
186                };
187
188                self.chunks.insert(key, State::Loaded { used: true, chunk });
189            }
190            Res::Unload(key, value, result) => {
191                debug_assert!(self.chunks.get(&key).unwrap().is_unloading());
192
193                let reload = match self.chunks.get(&key) {
194                    Some(&State::Unloading { reload }) => reload,
195                    _ => false,
196                };
197
198                let error = if let Err(e) = result {
199                    error!("Couldn't unload chunk {:?} ({}).", key, e);
200                    true
201                } else {
202                    false
203                };
204
205                if reload {
206                    self.req.send(Req::Load(key));
207                    self.chunks.insert(key, State::Loading);
208                } else if error {
209                    self.chunks.insert(
210                        key,
211                        State::Loaded {
212                            used: false,
213                            chunk: Chunk::Good { save: true, value },
214                        },
215                    );
216                } else {
217                    self.chunks.remove(&key);
218                }
219            }
220        }
221    }
222}
223
224impl<S: Store> Chunks<S>
225where
226    S::Value: Clone,
227{
228    /// Save any unsaved chunks.
229    pub fn save(&mut self) {
230        for (key, state) in self.chunks.iter_mut() {
231            if let State::Loaded {
232                chunk: Chunk::Good { save, value },
233                ..
234            } = state
235            {
236                if *save {
237                    self.req.send(Req::Save(*key, value.clone()));
238                    *save = false;
239                }
240            }
241        }
242    }
243}
244
245enum State<S: Store> {
246    Loading,
247    Unloading { reload: bool },
248    Loaded { used: bool, chunk: Chunk<S> },
249}
250
251impl<S: Store> State<S> {
252    fn is_loading(&self) -> bool {
253        if let State::Loading = self {
254            true
255        } else {
256            false
257        }
258    }
259
260    fn is_unloading(&self) -> bool {
261        if let State::Unloading { .. } = self {
262            true
263        } else {
264            false
265        }
266    }
267}
268
269enum Req<S: Store> {
270    Load(S::Key),
271    Unload(S::Key, S::Value),
272    Save(S::Key, S::Value),
273}
274
275enum Res<S: Store> {
276    Load(S::Key, io::Result<S::Value>),
277    Unload(S::Key, S::Value, io::Result<()>),
278}