actix_storage_hashmap/actor/
mod.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use actix::{
5    Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, ResponseActFuture,
6    StreamHandler, WrapFuture,
7};
8use actix_storage::dev::actor::{
9    ExpiryRequest, ExpiryResponse, ExpiryStoreRequest, ExpiryStoreResponse, StoreRequest,
10    StoreResponse,
11};
12
13mod delayqueue;
14use delayqueue::{delayqueue, DelayQueueEmergency, DelayQueueReceiver, DelayQueueSender, Expired};
15
16type ScopeMap = HashMap<Arc<[u8]>, Arc<[u8]>>;
17type InternalMap = HashMap<Arc<[u8]>, ScopeMap>;
18
19#[derive(Debug, Hash, PartialEq, Eq, Clone)]
20struct ExpiryKey {
21    pub(crate) scope: Arc<[u8]>,
22    pub(crate) key: Arc<[u8]>,
23}
24
25impl ExpiryKey {
26    pub fn new(scope: Arc<[u8]>, key: Arc<[u8]>) -> Self {
27        Self { scope, key }
28    }
29}
30
31/// An implementation of [`ExpiryStore`](actix_storage::dev::ExpiryStore) based on async
32/// actix actors and HashMap
33///
34/// It relies on tokio's DelayQueue internally to manage expiration, and it doesn't have any lock as
35/// it runs in single threaded async arbiter.
36///
37/// ## Example
38/// ```no_run
39/// use actix_storage::Storage;
40/// use actix_storage_hashmap::HashMapActor;
41/// use actix_web::{App, HttpServer};
42///
43/// #[actix_web::main]
44/// async fn main() -> std::io::Result<()> {
45///     let store = HashMapActor::start_default();
46///     // OR
47///     let store = HashMapActor::with_capacity(100).start();
48///
49///     let storage = Storage::build().expiry_store(store).finish();
50///     let server = HttpServer::new(move || {
51///         App::new()
52///             .data(storage.clone())
53///     });
54///     server.bind("localhost:5000")?.run().await
55/// }
56/// ```
57///
58/// requires ["actor"] feature
59#[derive(Debug)]
60pub struct HashMapActor {
61    map: InternalMap,
62    exp: DelayQueueSender<ExpiryKey>,
63    emergency_channel: DelayQueueEmergency<ExpiryKey>,
64
65    #[doc(hidden)]
66    exp_receiver: Option<DelayQueueReceiver<ExpiryKey>>,
67}
68
69const DEFAULT_INPUT_CHANNEL_SIZE: usize = 16;
70const DEFAULT_OUTPUT_CHANNEL_SIZE: usize = 16;
71
72impl HashMapActor {
73    /// Makes a new HashMapActor without starting it
74    #[must_use = "Actor should be started to work by calling `start`"]
75    pub fn new() -> Self {
76        Self::default()
77    }
78
79    /// Makes a new HashMapActor with specified HashMap capacity without starting it
80    #[must_use = "Actor should be started to work by calling `start`"]
81    pub fn with_capacity(capacity: usize) -> Self {
82        let (tx, rx, etx) = delayqueue(DEFAULT_INPUT_CHANNEL_SIZE, DEFAULT_OUTPUT_CHANNEL_SIZE);
83        Self {
84            map: HashMap::with_capacity(capacity),
85            exp: tx,
86            exp_receiver: Some(rx),
87            emergency_channel: etx,
88        }
89    }
90
91    /// Makes a new HashMapActor with specified channel capacity without starting it
92    ///
93    /// Buffer sizes are used for internal expiry channel provider, input is for the channel
94    /// providing commands expire/extend/expiry/persist and output is the other channel that
95    /// sends back expired items.
96    #[must_use = "Actor should be started to work by calling `start`"]
97    pub fn with_channel_size(input_buffer: usize, output_buffer: usize) -> Self {
98        let (tx, rx, etx) = delayqueue(input_buffer, output_buffer);
99        Self {
100            map: HashMap::new(),
101            exp: tx,
102            exp_receiver: Some(rx),
103            emergency_channel: etx,
104        }
105    }
106
107    /// Makes a new HashMapActor with specified HashMap and channel capacity without starting it
108    ///
109    /// Buffer sizes are used for internal expiry channel provider, input is for the channel
110    /// providing commands expire/extend/expiry/persist and output is the other channel that
111    /// sends back expired items.
112    #[must_use = "Actor should be started to work by calling `start`"]
113    pub fn with_capacity_and_channel_size(
114        capacity: usize,
115        input_buffer: usize,
116        output_buffer: usize,
117    ) -> Self {
118        let (tx, rx, etx) = delayqueue(input_buffer, output_buffer);
119        Self {
120            map: HashMap::with_capacity(capacity),
121            exp: tx,
122            exp_receiver: Some(rx),
123            emergency_channel: etx,
124        }
125    }
126
127    /// Equivalent of actix::Actor::start for when actix::Actor is not in scope
128    pub fn start(self) -> Addr<Self> {
129        Actor::start(self)
130    }
131
132    /// Equivalent of actix::Actor::start_default for when actix::Actor is not in scope
133    pub fn start_default() -> Addr<Self> {
134        <Self as Actor>::start_default()
135    }
136}
137
138impl Default for HashMapActor {
139    fn default() -> Self {
140        let (tx, rx, etx) = delayqueue(DEFAULT_INPUT_CHANNEL_SIZE, DEFAULT_OUTPUT_CHANNEL_SIZE);
141        Self {
142            map: HashMap::new(),
143            exp: tx,
144            exp_receiver: Some(rx),
145            emergency_channel: etx,
146        }
147    }
148}
149
150impl Actor for HashMapActor {
151    type Context = Context<Self>;
152
153    fn started(&mut self, ctx: &mut Self::Context) {
154        // if there receiver channel is available, we take it
155        // otherwise we establish a new receiving channel through emergency channel
156        if self.exp_receiver.is_some() {
157            let rx = std::mem::take(&mut self.exp_receiver).unwrap();
158            ctx.add_stream(rx);
159        } else {
160            let mut etx = self.emergency_channel.clone();
161            ctx.wait(Box::pin(
162                async move { etx.restart().await }
163                    .into_actor(self)
164                    .map(|message, _, ctx| {
165                        match message {
166                            Ok(ch) => {
167                                ctx.add_stream(ch);
168                            }
169                            Err(err) => {
170                                // Something went wrong with the channel, we stop
171                                log::error!(
172                                    "Expiration channel closed and could not be recovered. {}",
173                                    err
174                                );
175                                ctx.terminate();
176                            }
177                        }
178                    }),
179            ));
180        }
181    }
182}
183
184impl StreamHandler<Expired<ExpiryKey>> for HashMapActor {
185    fn handle(&mut self, item: Expired<ExpiryKey>, _: &mut Self::Context) {
186        let item = item.into_inner();
187        self.map
188            .get_mut(&item.scope)
189            .and_then(|scope_map| scope_map.remove(&item.key));
190    }
191}
192
193impl Handler<StoreRequest> for HashMapActor {
194    type Result = ResponseActFuture<Self, StoreResponse>;
195
196    fn handle(&mut self, msg: StoreRequest, ctx: &mut Self::Context) -> Self::Result {
197        match msg {
198            StoreRequest::Set(scope, key, value) => {
199                if self
200                    .map
201                    .entry(scope.clone())
202                    .or_default()
203                    .insert(key.clone(), value)
204                    .is_some()
205                {
206                    // Remove the key from expiry if it already exists
207                    let mut exp = self.exp.clone();
208                    Box::pin(
209                        async move {
210                            if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
211                                log::error!("{}", err);
212                            }
213                        }
214                        .into_actor(self)
215                        .map(move |_, _, _| StoreResponse::Set(Ok(()))),
216                    )
217                } else {
218                    Box::pin(async { StoreResponse::Set(Ok(())) }.into_actor(self))
219                }
220            }
221            StoreRequest::Get(scope, key) => {
222                let val = self
223                    .map
224                    .get(&scope)
225                    .and_then(|scope_map| scope_map.get(&key))
226                    .cloned();
227                Box::pin(async move { StoreResponse::Get(Ok(val)) }.into_actor(self))
228            }
229            StoreRequest::Delete(scope, key) => {
230                if self
231                    .map
232                    .get_mut(&scope)
233                    .and_then(|scope_map| scope_map.remove(&key))
234                    .is_some()
235                {
236                    // Remove key from expiry if the item actually existed and was removed
237                    let mut exp = self.exp.clone();
238                    ctx.spawn(
239                        async move {
240                            if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
241                                log::error!("{}", err);
242                            }
243                        }
244                        .into_actor(self),
245                    );
246                }
247                Box::pin(async { StoreResponse::Delete(Ok(())) }.into_actor(self))
248            }
249            StoreRequest::Contains(scope, key) => {
250                let con = self
251                    .map
252                    .get(&scope)
253                    .map(|scope_map| scope_map.contains_key(&key))
254                    .unwrap_or(false);
255                Box::pin(async move { StoreResponse::Contains(Ok(con)) }.into_actor(self))
256            }
257        }
258    }
259}
260
261impl Handler<ExpiryRequest> for HashMapActor {
262    type Result = ResponseActFuture<Self, ExpiryResponse>;
263
264    fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
265        match msg {
266            ExpiryRequest::Set(scope, key, expires_in) => {
267                if self
268                    .map
269                    .get(&scope)
270                    .map(|scope_map| scope_map.contains_key(&key))
271                    .unwrap_or(false)
272                {
273                    let mut exp = self.exp.clone();
274                    Box::pin(
275                        async move {
276                            if let Err(err) = exp
277                                .insert_or_update(ExpiryKey::new(scope, key), expires_in)
278                                .await
279                            {
280                                log::error!("{}", err);
281                            }
282                        }
283                        .into_actor(self)
284                        .map(move |_, _, _| ExpiryResponse::Set(Ok(()))),
285                    )
286                } else {
287                    // The key does not exist, we return Ok here as the non-existent key may be expired before
288                    Box::pin(async { ExpiryResponse::Set(Ok(())) }.into_actor(self))
289                }
290            }
291            ExpiryRequest::Persist(scope, key) => {
292                if self
293                    .map
294                    .get(&scope)
295                    .map(|scope_map| scope_map.contains_key(&key))
296                    .unwrap_or(false)
297                {
298                    let mut exp = self.exp.clone();
299                    Box::pin(
300                        async move {
301                            if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
302                                log::error!("{}", err);
303                            }
304                        }
305                        .into_actor(self)
306                        .map(move |_, _, _| ExpiryResponse::Persist(Ok(()))),
307                    )
308                } else {
309                    Box::pin(async { ExpiryResponse::Persist(Ok(())) }.into_actor(self))
310                }
311            }
312            ExpiryRequest::Get(scope, key) => {
313                let mut exp = self.exp.clone();
314                Box::pin(
315                    async move {
316                        match exp.get(ExpiryKey::new(scope, key)).await {
317                            Ok(val) => val,
318                            Err(err) => {
319                                log::error!("{}", err);
320                                None
321                            }
322                        }
323                    }
324                    .into_actor(self)
325                    .map(|val, _, _| ExpiryResponse::Get(Ok(val))),
326                )
327            }
328            ExpiryRequest::Extend(scope, key, duration) => {
329                let mut exp = self.exp.clone();
330                Box::pin(
331                    async move { exp.extend(ExpiryKey::new(scope, key), duration).await }
332                        .into_actor(self)
333                        .map(|_, _, _| ExpiryResponse::Extend(Ok(()))),
334                )
335            }
336        }
337    }
338}
339
340impl Handler<ExpiryStoreRequest> for HashMapActor {
341    type Result = ResponseActFuture<Self, ExpiryStoreResponse>;
342
343    fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
344        match msg {
345            ExpiryStoreRequest::SetExpiring(scope, key, value, expires_in) => {
346                self.map
347                    .entry(scope.clone())
348                    .or_default()
349                    .insert(key.clone(), value);
350                let mut exp = self.exp.clone();
351                Box::pin(
352                    async move {
353                        exp.insert_or_update(ExpiryKey::new(scope, key), expires_in)
354                            .await
355                    }
356                    .into_actor(self)
357                    .map(move |_, _, _| ExpiryStoreResponse::SetExpiring(Ok(()))),
358                )
359            }
360            ExpiryStoreRequest::GetExpiring(scope, key) => {
361                let val = self
362                    .map
363                    .get(&scope)
364                    .and_then(|scope_map| scope_map.get(&key))
365                    .cloned();
366                if let Some(val) = val {
367                    let mut exp = self.exp.clone();
368                    Box::pin(
369                        async move {
370                            match exp.get(ExpiryKey::new(scope, key)).await {
371                                Ok(val) => val,
372                                Err(err) => {
373                                    log::error!("{}", err);
374                                    None
375                                }
376                            }
377                        }
378                        .into_actor(self)
379                        .map(|expiry, _, _| {
380                            ExpiryStoreResponse::GetExpiring(Ok(Some((val, expiry))))
381                        }),
382                    )
383                } else {
384                    Box::pin(async { ExpiryStoreResponse::GetExpiring(Ok(None)) }.into_actor(self))
385                }
386            }
387        }
388    }
389}
390
391#[cfg(test)]
392mod test {
393    use super::*;
394    use actix_storage::tests::*;
395
396    #[test]
397    fn test_hashmap_store() {
398        test_store(Box::pin(async { HashMapActor::start_default() }));
399    }
400
401    #[test]
402    fn test_hashmap_expiry() {
403        test_expiry(
404            Box::pin(async {
405                let store = HashMapActor::start_default();
406                (store.clone(), store)
407            }),
408            2,
409        );
410    }
411
412    #[test]
413    fn test_hashmap_expiry_store() {
414        test_expiry_store(Box::pin(async { HashMapActor::start_default() }), 2);
415    }
416
417    #[test]
418    fn test_hashmap_formats() {
419        test_all_formats(Box::pin(async { HashMapActor::start_default() }));
420    }
421}