actix_storage_dashmap/
actor.rs

1use std::sync::{atomic::AtomicBool, Arc};
2use std::time::{Duration, Instant};
3
4use actix::{Actor, Addr, Handler, SyncArbiter, SyncContext};
5use actix_storage::dev::actor::{
6    ExpiryRequest, ExpiryResponse, ExpiryStoreRequest, ExpiryStoreResponse, StoreRequest,
7    StoreResponse,
8};
9use dashmap::DashMap;
10use delay_queue::{Delay, DelayQueue};
11
12/// The value representation that is stored in DashMap. Includes metadata for expiration logic.
13struct Value {
14    bytes: Arc<[u8]>,
15    timeout: Option<Instant>,
16    persist: bool,
17    // nonce increases whenever a new value is set or expiration time changes
18    nonce: usize,
19}
20
21impl Value {
22    pub fn new(bytes: Arc<[u8]>, nonce: usize) -> Self {
23        Value {
24            bytes,
25            timeout: None,
26            persist: true,
27            nonce,
28        }
29    }
30
31    pub fn new_expiring(bytes: Arc<[u8]>, nonce: usize, expires_in: Duration) -> Self {
32        Value {
33            bytes,
34            timeout: Some(Instant::now() + expires_in),
35            persist: false,
36            nonce,
37        }
38    }
39
40    pub fn expires_in(&self) -> Option<Duration> {
41        if self.persist == true {
42            None
43        } else {
44            self.timeout
45                .and_then(|timeout| timeout.checked_duration_since(Instant::now()))
46        }
47    }
48
49    pub fn set_expires_in(&mut self, expires_in: Duration) -> Instant {
50        let timeout = Instant::now() + expires_in;
51        self.persist = false;
52        self.timeout = Some(timeout);
53        self.increase_nonce();
54        timeout
55    }
56
57    pub fn extend_expires_in(&mut self, expires_in: Duration) -> Instant {
58        if let Some(timeout) = self.timeout {
59            let new_timeout = timeout + expires_in;
60            self.persist = false;
61            self.timeout = Some(new_timeout);
62            self.increase_nonce();
63            new_timeout
64        } else {
65            self.set_expires_in(expires_in)
66        }
67    }
68
69    fn increase_nonce(&mut self) {
70        self.nonce = self.nonce.checked_add(1).unwrap_or(0);
71    }
72
73    pub fn persist(&mut self) {
74        self.persist = true;
75    }
76}
77
78type ScopeMap = DashMap<Arc<[u8]>, Value>;
79type InternalMap = DashMap<Arc<[u8]>, ScopeMap>;
80/// (Scope, Key, Nonce)
81type ExpiringKey = (Arc<[u8]>, Arc<[u8]>, usize);
82
83/// An implementation of [`ExpiryStore`](actix_storage::dev::ExpiryStore) based on sync
84/// actix actors and HashMap
85///
86/// It relies on delay_queue crate to provide expiration.
87///
88/// ## Example
89/// ```no_run
90/// use actix_storage::Storage;
91/// use actix_storage_dashmap::DashMapActor;
92/// use actix_web::{App, HttpServer};
93///
94/// #[actix_web::main]
95/// async fn main() -> std::io::Result<()> {
96///     const THREADS_NUMBER: usize = 4;
97///     let store = DashMapActor::start_default(THREADS_NUMBER);
98///     // OR
99///     let store = DashMapActor::with_capacity(100).start(THREADS_NUMBER);
100///     
101///     let storage = Storage::build().expiry_store(store).finish();
102///     let server = HttpServer::new(move || {
103///         App::new()
104///             .data(storage.clone())
105///     });
106///     server.bind("localhost:5000")?.run().await
107/// }
108/// ```
109///
110/// requires ["actor"] feature
111#[derive(Clone, Default)]
112pub struct DashMapActor {
113    map: Arc<InternalMap>,
114    queue: DelayQueue<Delay<ExpiringKey>>,
115
116    #[doc(hidden)]
117    stopped: Arc<AtomicBool>,
118}
119
120impl DashMapActor {
121    /// Makes a new DashMapActor without starting it
122    #[must_use = "Actor should be started to work by calling `start`"]
123    pub fn new() -> Self {
124        Self::default()
125    }
126
127    /// Makes a new DashMapActor with specified DashMap capacity without starting it
128    #[must_use = "Actor should be started to work by calling `start`"]
129    pub fn with_capacity(capacity: usize) -> Self {
130        Self {
131            map: DashMap::with_capacity(capacity).into(),
132            queue: DelayQueue::default(),
133            stopped: Arc::new(AtomicBool::new(false)),
134        }
135    }
136
137    /// Create default actor and start the actor in an actix sync arbiter with specified
138    /// number of threads
139    pub fn start_default(threads_num: usize) -> Addr<Self> {
140        let storage = Self::default();
141        SyncArbiter::start(threads_num, move || storage.clone())
142    }
143
144    /// Start the actor in an actix sync arbiter with specified number of threads
145    pub fn start(self, threads_num: usize) -> Addr<Self> {
146        SyncArbiter::start(threads_num, move || self.clone())
147    }
148}
149
150impl Actor for DashMapActor {
151    type Context = SyncContext<Self>;
152
153    fn started(&mut self, _: &mut Self::Context) {
154        let map = self.map.clone();
155        let mut queue = self.queue.clone();
156
157        let stopped = self.stopped.clone();
158
159        std::thread::spawn(move || loop {
160            if let Some(item) = queue.try_pop_for(Duration::from_secs(1)) {
161                let mut should_delete = false;
162                let scope = &item.value.0;
163                let key = &item.value.1;
164                let nonce = item.value.2;
165                if let Some(scope_map) = map.get_mut(scope) {
166                    if let Some(value) = scope_map.get(key) {
167                        if value.nonce != nonce {
168                            continue;
169                        }
170
171                        if !value.persist {
172                            should_delete = true;
173                        }
174                    }
175                };
176                if should_delete {
177                    map.get_mut(scope)
178                        .and_then(|scope_map| scope_map.remove(key));
179                }
180            } else if stopped.load(std::sync::atomic::Ordering::Relaxed) {
181                break;
182            }
183        });
184    }
185}
186
187impl Handler<StoreRequest> for DashMapActor {
188    type Result = StoreResponse;
189
190    fn handle(&mut self, msg: StoreRequest, _: &mut Self::Context) -> Self::Result {
191        match msg {
192            StoreRequest::Set(scope, key, value) => {
193                self.map
194                    .entry(scope)
195                    .or_default()
196                    .entry(key)
197                    .and_modify(|val| {
198                        val.nonce += 1;
199                        val.bytes = value.clone();
200                    })
201                    .or_insert_with(|| Value::new(value, 0));
202                StoreResponse::Set(Ok(()))
203            }
204            StoreRequest::Get(scope, key) => {
205                let value = if let Some(scope_map) = self.map.get(&scope) {
206                    scope_map.get(&key).map(|val| val.bytes.clone())
207                } else {
208                    None
209                };
210                StoreResponse::Get(Ok(value))
211            }
212            StoreRequest::Delete(scope, key) => {
213                self.map
214                    .get_mut(&scope)
215                    .and_then(|scope_map| scope_map.remove(&key));
216                StoreResponse::Delete(Ok(()))
217            }
218            StoreRequest::Contains(scope, key) => {
219                let contains = self
220                    .map
221                    .get(&scope)
222                    .map(|scope_map| scope_map.contains_key(&key))
223                    .unwrap_or(false);
224                StoreResponse::Contains(Ok(contains))
225            }
226        }
227    }
228}
229
230impl Handler<ExpiryRequest> for DashMapActor {
231    type Result = ExpiryResponse;
232
233    fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
234        match msg {
235            ExpiryRequest::Set(scope, key, expires_in) => {
236                if let Some(scope_map) = self.map.get_mut(&scope) {
237                    if let Some(mut val) = scope_map.get_mut(&key) {
238                        let timeout = val.set_expires_in(expires_in);
239                        self.queue
240                            .push(Delay::until_instant((scope, key, val.nonce), timeout));
241                    }
242                }
243                ExpiryResponse::Set(Ok(()))
244            }
245            ExpiryRequest::Persist(scope, key) => {
246                if let Some(scope_map) = self.map.get_mut(&scope) {
247                    if let Some(mut val) = scope_map.get_mut(&key) {
248                        val.persist();
249                    }
250                }
251                ExpiryResponse::Persist(Ok(()))
252            }
253            ExpiryRequest::Get(scope, key) => {
254                let item = if let Some(scope_map) = self.map.get(&scope) {
255                    scope_map.get(&key).and_then(|val| val.expires_in())
256                } else {
257                    None
258                };
259                ExpiryResponse::Get(Ok(item))
260            }
261            ExpiryRequest::Extend(scope, key, duration) => {
262                if let Some(scope_map) = self.map.get_mut(&scope) {
263                    if let Some(mut val) = scope_map.get_mut(&key) {
264                        let new_timeout = val.extend_expires_in(duration);
265                        self.queue
266                            .push(Delay::until_instant((scope, key, val.nonce), new_timeout));
267                    }
268                }
269                ExpiryResponse::Extend(Ok(()))
270            }
271        }
272    }
273}
274
275impl Handler<ExpiryStoreRequest> for DashMapActor {
276    type Result = ExpiryStoreResponse;
277
278    fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
279        match msg {
280            ExpiryStoreRequest::SetExpiring(scope, key, value, expires_in) => {
281                let scope_map = self.map.entry(scope.clone()).or_default();
282                let val = scope_map
283                    .entry(key.clone())
284                    .and_modify(|val| {
285                        val.nonce += 1;
286                        val.bytes = value.clone();
287                        val.set_expires_in(expires_in);
288                    })
289                    .or_insert_with(|| Value::new_expiring(value, 0, expires_in));
290                self.queue
291                    .push(Delay::for_duration((scope, key, val.nonce), expires_in));
292                ExpiryStoreResponse::SetExpiring(Ok(()))
293            }
294            ExpiryStoreRequest::GetExpiring(scope, key) => {
295                let values = if let Some(scope_map) = self.map.get(&scope) {
296                    scope_map
297                        .get(&key)
298                        .map(|val| (val.bytes.clone(), val.expires_in()))
299                } else {
300                    None
301                };
302
303                ExpiryStoreResponse::GetExpiring(Ok(values))
304            }
305        }
306    }
307}
308
309#[cfg(test)]
310mod test {
311    use super::*;
312    use actix_storage::tests::*;
313
314    #[test]
315    fn test_dashmap_store() {
316        test_store(Box::pin(async { DashMapActor::default().start(1) }));
317    }
318
319    #[test]
320    fn test_dashmap_expiry() {
321        test_expiry(
322            Box::pin(async {
323                let store = DashMapActor::default().start(1);
324                (store.clone(), store)
325            }),
326            2,
327        );
328    }
329
330    #[test]
331    fn test_dashmap_expiry_store() {
332        test_expiry_store(
333            Box::pin(async {
334                let store = DashMapActor::default().start(1);
335                store
336            }),
337            2,
338        );
339    }
340
341    #[test]
342    fn test_dashmap_formats() {
343        test_all_formats(Box::pin(async {
344            let store = DashMapActor::default().start(1);
345            store
346        }));
347    }
348}