writium_cache/
cache.rs

1///! Implementation of caching for common use of Writium APIs.
2///!
3///! # Why I should use caches?
4///!
5///! The use of cache system can help you reduce disk I/O so that more fluent
6///! execution can be achieved.
7///!
8///! There is a question you may ask: why not simply use external caching
9///! systems (like Nginx)? The answer is, `writium-cache` and Nginx are totally
10///! different. Nginx caches *already generated* contents but `writium-cache`
11///! *prevents I/O blocking* of working threads. It's needed because Writium API
12///! calls are *synchronized* for you to write clean codes, before the stable.
13///!
14///! # About pending
15///!
16///! For now, `writium-cache` doesn't offer any multi-threading features, but it
17///! guarantees that everything won't go wrong in such context. There is only
18///! one thread is involved at a time for a single `Cache` object. Everytime an
19///! uncached item is requested, an cached item needs to be unloaded/removed,
20///! heavy I/O might come up. In such period of time I/O-ing, the item is
21///! attributed 'pending'. Items in such state is not exposed to users. And it
22///! can influence the entire system's efficiency seriously by blocking threads.
23///! Such outcome is undesirable commonly. Thus, 'pending' state is considered a
24///! performance issue and should be fixed in future versions.
25///!
26///! There are two cases an item is in 'pending' state:
27///!
28///! 1. Cached data was dirty and is now 'abandoned' by `Cache` - a
29///! corresponding `CacheSource` has been trying Writing the data back to
30///! local storage. If the data is requested again after this intermediate
31///! state, the state will be restored to `Intact`. When unloading is
32///! finished, data is written back to storage and is removed from the owning
33///! `Cache`.
34///!
35///! 2. Cached data is being removed by a corresponding `CacheSource`. If the
36///! data is requested again after this intermediate state, the state will be
37///! restored to `Dirty` (as a new instance is created). When removal is
38///! finished, data is removed from storage (as well as the owning `Cache`, if
39///! the data was loaded).
40use std::sync::{Arc, Mutex};
41use writium::prelude::*;
42use item::CacheItem;
43
44const ERR_POISONED_THREAD: &str = "Current thread is poisoned.";
45
46/// Cache for each Writium Api. Any Writium Api can be composited with this
47/// struct for cache.
48pub struct Cache<T: 'static> {
49    /// In `cache`, the flag `is_dirty` of an item indicates whether it should
50    /// be written back to source.
51    cache: Mutex<Vec<Arc<CacheItem<T>>>>,
52    src: Box<CacheSource<Value=T>>,
53}
54impl<T: 'static> Cache<T> {
55    pub fn new<Src>(capacity: usize, src: Src) -> Cache<T>
56        where Src: 'static + CacheSource<Value=T> {
57        Cache {
58            cache: Mutex::new(Vec::with_capacity(capacity)),
59            src: Box::new(src),
60        }
61    }
62    /// Get the object identified by given ID. If the object is not cached, try
63    /// recovering its cache from provided source. If there is no space for
64    /// another object, the last recently accessed cache will be disposed.
65    pub fn create(&self, id: &str) -> Result<Arc<CacheItem<T>>> {
66        self._get(&id, true)
67    }
68    /// Get the object identified by given ID. If the object is not cached,
69    /// error will be returned. If there is no space for another object, the
70    /// last recently accessed cache will be disposed.
71    pub fn get(&self, id: &str) -> Result<Arc<CacheItem<T>>> {
72        self._get(&id, false)
73    }
74
75    fn _get(&self, id: &str, create: bool) -> Result<Arc<CacheItem<T>>> {
76        // Not intended to introduce too much complexity.
77        let mut cache = self.cache.lock()
78            .map_err(|_| Error::internal(ERR_POISONED_THREAD))?;
79        if let Some(pos) = cache.iter()
80            .position(|item| item.id() == id) {
81            // Cache found.
82            let arc = cache.remove(pos);
83            cache.insert(0, arc.clone());
84            return Ok(arc)
85        } else {
86            // Requested resource is not yet cached. Load now.
87            let new_item = CacheItem::new(id, self.src.load(id, create)?);
88            let new_arc = Arc::new(new_item);
89            // Not actually caching anything when capacity is 0.
90            if cache.capacity() == 0 {
91                return Ok(new_arc)
92            }
93            // Remove the least-recently-used item from collection.
94            if cache.len() == cache.capacity() {
95                let lru_item = cache.pop().unwrap();
96                // Unload items only when they are dirty.
97                if lru_item.is_dirty() {
98                    let data = lru_item.write()?;
99                    if let Err(err) = self.src.unload(lru_item.id(), &*data) {
100                        error!("Unable to unload '{}': {}", id, err);
101                    }   
102                }
103            }
104            cache.insert(0, new_arc.clone());
105            return Ok(new_arc)
106        }
107    }
108
109    /// Remove the object identified by given ID.
110    pub fn remove(&self, id: &str) -> Result<()> {
111        let mut cache = self.cache.lock().unwrap();
112        cache.iter()
113            .position(|nid| nid.id() == id)
114            .map(|pos| cache.remove(pos));
115        self.src.remove(&id)
116    }
117
118    /// The maximum number of items can be cached at a same time. Tests only.
119    #[cfg(test)]
120    pub fn capacity(&self) -> usize {
121        // Only if the thread is poisoned `cache` will be unavailable.
122        self.cache.lock().unwrap().capacity()
123    }
124
125    /// Get the number of items cached. Tests only.
126    #[cfg(test)]
127    pub fn len(&self) -> usize {
128        // Only if the thread is poisoned `cache` will be unavailable.
129        self.cache.lock().unwrap().len()
130    }
131}
132
133/// A source where cache can be generated from.
134pub trait CacheSource: 'static + Send + Sync {
135    type Value: 'static;
136    /// Create a new cached object. Return a value defined by default
137    /// configurations if `create` is set. In the course of creation, no state
138    /// should be stored out of RAM, e.g., writing to files or calling to remote
139    /// machines.
140    fn load(&self, id: &str, create: bool) -> Result<Self::Value>;
141    /// Unload a cached object. Implementations should write the value into a
142    /// recoverable form of storage, e.g., serializing data into JSON, if
143    /// necessary. Cache unloading is an optional process.
144    fn unload(&self, _id: &str, _obj: &Self::Value) -> Result<()> {
145        Ok(())
146    }
147    /// Remove a cached object. Implementations should remove any associated
148    /// data from storage and invalidate any recoverability. If a resource
149    /// doesn't exist, the source shouldn't report an error. Cache removal is an
150    /// optional process.
151    fn remove(&self, _id: &str) -> Result<()> {
152        Ok(())
153    }
154}
155impl<T: 'static> Drop for Cache<T> {
156    /// Implement drop so that modified cached data can be returned to source
157    /// properly.
158    fn drop(&mut self) {
159        let mut lock = self.cache.lock().unwrap();
160        while let Some(item) = lock.pop() {
161            if !item.is_dirty() { continue }
162            let guard = item.write().unwrap();
163            if let Err(err) = self.src.unload(item.id(), &guard) {
164                warn!("Unable to unload '{}': {}", item.id(), err);
165            }
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use writium::prelude::*;
173    // `bool` controls always fail.
174    struct TestSource(bool);
175    impl super::CacheSource for TestSource {
176        type Value = &'static str;
177        fn load(&self, id: &str, _create: bool) -> Result<Self::Value> {
178            if self.0 { Err(Error::not_found("")) }
179            else { Ok(&["cache0", "cache1", "cache2", "cache3"][id.parse::<usize>().unwrap()]) }
180        }
181        fn unload(&self, _id: &str, _obj: &Self::Value) -> Result<()> {
182            Ok(())
183        }
184        fn remove(&self, _id: &str) -> Result<()> {
185            Ok(())
186        }
187    }
188    type TestCache = super::Cache<&'static str>;
189
190    fn make_cache(fail: bool) -> TestCache {
191        TestCache::new(3, TestSource(fail))
192    }
193
194    #[test]
195    fn test_cache() {
196        let cache = make_cache(false);
197        assert!(cache.get("0").is_ok());
198        assert!(cache.get("1").is_ok());
199        assert!(cache.get("2").is_ok());
200    }
201    #[test]
202    fn test_cache_failure() {
203        let cache = make_cache(true);
204        assert!(cache.get("0").is_err());
205        assert!(cache.get("1").is_err());
206        assert!(cache.get("2").is_err());
207    }
208    #[test]
209    fn test_max_cache() {
210        let cache = make_cache(false);
211        assert!(cache.len() == 0);
212        assert!(cache.get("0").is_ok());
213        assert!(cache.len() == 1);
214        assert!(cache.get("1").is_ok());
215        assert!(cache.len() == 2);
216        assert!(cache.get("2").is_ok());
217        assert!(cache.len() == 3);
218        assert!(cache.get("3").is_ok());
219        assert!(cache.len() == 3);
220    }
221    #[test]
222    fn test_max_cache_failure() {
223        let cache = make_cache(true);
224        assert!(cache.len() == 0);
225        assert!(cache.get("0").is_err());
226        assert!(cache.len() == 0);
227        assert!(cache.get("1").is_err());
228        assert!(cache.len() == 0);
229        assert!(cache.get("2").is_err());
230        assert!(cache.len() == 0);
231    }
232    #[test]
233    fn test_remove() {
234        let cache = make_cache(false);
235        assert!(cache.get("0").is_ok());
236        assert!(cache.len() == 1);
237        assert!(cache.remove("0").is_ok());
238        assert!(cache.len() == 0);
239        assert!(cache.remove("0").is_ok());
240    }
241}