redis_cell/cell/
store.rs

1extern crate time;
2
3use error::CellError;
4use redis;
5use std::collections::HashMap;
6
7/// Store exposes the atomic data store operations that the GCRA rate limiter
8/// needs to function correctly.
9///
10/// Note that because the default mode for this library is to run within Redis
11/// (making every operation atomic by default), the encapsulation is not
12/// strictly needed. However, it's written to be generic enough that a
13/// out-of-Redis Store could be written and have the rate limiter still work
14/// properly.
15pub trait Store {
16    /// Compares the value at the given key with a known old value and swaps it
17    /// for a new value if and only if they're equal. Also sets the key's TTL
18    /// until it expires.
19    fn compare_and_swap_with_ttl(
20        &mut self,
21        key: &str,
22        old: i64,
23        new: i64,
24        ttl: time::Duration,
25    ) -> Result<bool, CellError>;
26
27    /// Gets the given key's value and the current time as dictated by the
28    /// store (this is done so that rate limiters running on a variety of
29    /// different nodes can operate with a consistent clock instead of using
30    /// their own). If the key was unset, -1 is returned.
31    fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError>;
32
33    /// Logs a debug message to the data store.
34    fn log_debug(&self, message: &str);
35
36    /// Sets the given key to the given value if and only if it doesn't already
37    /// exit. Whether or not the key existed previously it's given a new TTL.
38    fn set_if_not_exists_with_ttl(
39        &mut self,
40        key: &str,
41        value: i64,
42        ttl: time::Duration,
43    ) -> Result<bool, CellError>;
44}
45
46/// `MemoryStore` is a simple implementation of Store that persists data in an in-memory
47/// `HashMap`.
48///
49/// Note that the implementation is currently not thread-safe and will need a mutex added
50/// if it's ever used for anything serious.
51#[derive(Default)]
52pub struct MemoryStore {
53    map:     HashMap<String, i64>,
54    verbose: bool,
55}
56
57impl MemoryStore {
58    pub fn new() -> MemoryStore {
59        Self::default()
60    }
61
62    pub fn new_verbose() -> MemoryStore {
63        MemoryStore {
64            map:     HashMap::new(),
65            verbose: true,
66        }
67    }
68}
69
70impl Store for MemoryStore {
71    fn compare_and_swap_with_ttl(
72        &mut self,
73        key: &str,
74        old: i64,
75        new: i64,
76        _: time::Duration,
77    ) -> Result<bool, CellError> {
78        match self.map.get(key) {
79            Some(n) if *n != old => return Ok(false),
80            _ => (),
81        };
82
83        self.map.insert(String::from(key), new);
84        Ok(true)
85    }
86
87    fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> {
88        match self.map.get(key) {
89            Some(n) => Ok((*n, time::now_utc())),
90            None => Ok((-1, time::now_utc())),
91        }
92    }
93
94    fn log_debug(&self, message: &str) {
95        if self.verbose {
96            println!("memory_store: {}", message);
97        }
98    }
99
100    fn set_if_not_exists_with_ttl(
101        &mut self,
102        key: &str,
103        value: i64,
104        _: time::Duration,
105    ) -> Result<bool, CellError> {
106        match self.map.get(key) {
107            Some(_) => Ok(false),
108            None => {
109                self.map.insert(String::from(key), value);
110                Ok(true)
111            }
112        }
113    }
114}
115
116/// `InternalRedisStore` is a store implementation that uses Redis module APIs in
117/// that it's designed to run from within a Redis runtime. This allows us to
118/// cut some corners around atomicity because we can safety assume that all
119/// operations will be atomic.
120pub struct InternalRedisStore<'a> {
121    r: &'a redis::Redis,
122}
123
124impl<'a> InternalRedisStore<'a> {
125    pub fn new(r: &'a redis::Redis) -> InternalRedisStore<'a> {
126        InternalRedisStore { r }
127    }
128}
129
130impl<'a> Store for InternalRedisStore<'a> {
131    fn compare_and_swap_with_ttl(
132        &mut self,
133        key: &str,
134        old: i64,
135        new: i64,
136        ttl: time::Duration,
137    ) -> Result<bool, CellError> {
138        let key = self.r.open_key_writable(key);
139        match key.read()? {
140            Some(s) => {
141                // While we will usually have a value here to parse, it's possible that
142                // in the case of a very fast rate the key's already been
143                // expired even since the beginning of this operation.
144                // Check whether the value is empty to handle that possibility.
145                if !s.is_empty() && s.parse::<i64>()? == old {
146                    // Still the old value: perform the swap.
147                    key.write(new.to_string().as_str())?;
148                    key.set_expire(ttl)?;
149                    Ok(true)
150                } else {
151                    // Not the old value: something else must have set it. Take no
152                    // action.
153                    Ok(false)
154                }
155            }
156
157            // Value wasn't set.
158            None => Ok(false),
159        }
160    }
161
162    fn get_with_time(&self, key: &str) -> Result<(i64, time::Tm), CellError> {
163        // TODO: currently leveraging that CommandError and CellError are the
164        // same thing, but we should probably reconcile this.
165        let key = self.r.open_key(key);
166        match key.read()? {
167            Some(s) => {
168                let n = s.parse::<i64>()?;
169                Ok((n, time::now_utc()))
170            }
171            None => Ok((-1, time::now_utc())),
172        }
173    }
174
175    fn log_debug(&self, message: &str) {
176        self.r.log_debug(message);
177    }
178
179    fn set_if_not_exists_with_ttl(
180        &mut self,
181        key: &str,
182        value: i64,
183        ttl: time::Duration,
184    ) -> Result<bool, CellError> {
185        let key = self.r.open_key_writable(key);
186        let res = if key.is_empty()? {
187            key.write(value.to_string().as_str())?;
188            Ok(true)
189        } else {
190            Ok(false)
191        };
192        key.set_expire(ttl)?;
193        res
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    extern crate time;
200
201    use cell::store::*;
202
203    #[test]
204    fn it_performs_compare_and_swap_with_ttl() {
205        let mut store = MemoryStore::default();
206
207        // First attempt obviously works.
208        let res1 =
209            store.compare_and_swap_with_ttl("foo", 123, 124, time::Duration::zero());
210        assert_eq!(true, res1.unwrap());
211
212        // Second attempt succeeds: we use the value we just set combined with
213        // a new value.
214        let res2 =
215            store.compare_and_swap_with_ttl("foo", 124, 125, time::Duration::zero());
216        assert_eq!(true, res2.unwrap());
217
218        // Third attempt fails: we try to overwrite using a value that is
219        // incorrect.
220        let res2 =
221            store.compare_and_swap_with_ttl("foo", 123, 126, time::Duration::zero());
222        assert_eq!(false, res2.unwrap());
223    }
224
225    #[test]
226    fn it_performs_get_with_time() {
227        let mut store = MemoryStore::default();
228
229        let res1 = store.get_with_time("foo");
230        assert_eq!(-1, res1.unwrap().0);
231
232        // Now try setting a value.
233        let _ = store
234            .set_if_not_exists_with_ttl("foo", 123, time::Duration::zero())
235            .unwrap();
236
237        let res2 = store.get_with_time("foo");
238        assert_eq!(123, res2.unwrap().0);
239    }
240
241    #[test]
242    fn it_performs_set_if_not_exists_with_ttl() {
243        let mut store = MemoryStore::default();
244
245        let res1 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero());
246        assert_eq!(true, res1.unwrap());
247
248        let res2 = store.set_if_not_exists_with_ttl("foo", 123, time::Duration::zero());
249        assert_eq!(false, res2.unwrap());
250    }
251}