actix_storage_hashmap/actor/
mod.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use actix::{
5 Actor, ActorContext, ActorFutureExt, Addr, AsyncContext, Context, Handler, ResponseActFuture,
6 StreamHandler, WrapFuture,
7};
8use actix_storage::dev::actor::{
9 ExpiryRequest, ExpiryResponse, ExpiryStoreRequest, ExpiryStoreResponse, StoreRequest,
10 StoreResponse,
11};
12
13mod delayqueue;
14use delayqueue::{delayqueue, DelayQueueEmergency, DelayQueueReceiver, DelayQueueSender, Expired};
15
16type ScopeMap = HashMap<Arc<[u8]>, Arc<[u8]>>;
17type InternalMap = HashMap<Arc<[u8]>, ScopeMap>;
18
19#[derive(Debug, Hash, PartialEq, Eq, Clone)]
20struct ExpiryKey {
21 pub(crate) scope: Arc<[u8]>,
22 pub(crate) key: Arc<[u8]>,
23}
24
25impl ExpiryKey {
26 pub fn new(scope: Arc<[u8]>, key: Arc<[u8]>) -> Self {
27 Self { scope, key }
28 }
29}
30
31#[derive(Debug)]
60pub struct HashMapActor {
61 map: InternalMap,
62 exp: DelayQueueSender<ExpiryKey>,
63 emergency_channel: DelayQueueEmergency<ExpiryKey>,
64
65 #[doc(hidden)]
66 exp_receiver: Option<DelayQueueReceiver<ExpiryKey>>,
67}
68
69const DEFAULT_INPUT_CHANNEL_SIZE: usize = 16;
70const DEFAULT_OUTPUT_CHANNEL_SIZE: usize = 16;
71
72impl HashMapActor {
73 #[must_use = "Actor should be started to work by calling `start`"]
75 pub fn new() -> Self {
76 Self::default()
77 }
78
79 #[must_use = "Actor should be started to work by calling `start`"]
81 pub fn with_capacity(capacity: usize) -> Self {
82 let (tx, rx, etx) = delayqueue(DEFAULT_INPUT_CHANNEL_SIZE, DEFAULT_OUTPUT_CHANNEL_SIZE);
83 Self {
84 map: HashMap::with_capacity(capacity),
85 exp: tx,
86 exp_receiver: Some(rx),
87 emergency_channel: etx,
88 }
89 }
90
91 #[must_use = "Actor should be started to work by calling `start`"]
97 pub fn with_channel_size(input_buffer: usize, output_buffer: usize) -> Self {
98 let (tx, rx, etx) = delayqueue(input_buffer, output_buffer);
99 Self {
100 map: HashMap::new(),
101 exp: tx,
102 exp_receiver: Some(rx),
103 emergency_channel: etx,
104 }
105 }
106
107 #[must_use = "Actor should be started to work by calling `start`"]
113 pub fn with_capacity_and_channel_size(
114 capacity: usize,
115 input_buffer: usize,
116 output_buffer: usize,
117 ) -> Self {
118 let (tx, rx, etx) = delayqueue(input_buffer, output_buffer);
119 Self {
120 map: HashMap::with_capacity(capacity),
121 exp: tx,
122 exp_receiver: Some(rx),
123 emergency_channel: etx,
124 }
125 }
126
127 pub fn start(self) -> Addr<Self> {
129 Actor::start(self)
130 }
131
132 pub fn start_default() -> Addr<Self> {
134 <Self as Actor>::start_default()
135 }
136}
137
138impl Default for HashMapActor {
139 fn default() -> Self {
140 let (tx, rx, etx) = delayqueue(DEFAULT_INPUT_CHANNEL_SIZE, DEFAULT_OUTPUT_CHANNEL_SIZE);
141 Self {
142 map: HashMap::new(),
143 exp: tx,
144 exp_receiver: Some(rx),
145 emergency_channel: etx,
146 }
147 }
148}
149
150impl Actor for HashMapActor {
151 type Context = Context<Self>;
152
153 fn started(&mut self, ctx: &mut Self::Context) {
154 if self.exp_receiver.is_some() {
157 let rx = std::mem::take(&mut self.exp_receiver).unwrap();
158 ctx.add_stream(rx);
159 } else {
160 let mut etx = self.emergency_channel.clone();
161 ctx.wait(Box::pin(
162 async move { etx.restart().await }
163 .into_actor(self)
164 .map(|message, _, ctx| {
165 match message {
166 Ok(ch) => {
167 ctx.add_stream(ch);
168 }
169 Err(err) => {
170 log::error!(
172 "Expiration channel closed and could not be recovered. {}",
173 err
174 );
175 ctx.terminate();
176 }
177 }
178 }),
179 ));
180 }
181 }
182}
183
184impl StreamHandler<Expired<ExpiryKey>> for HashMapActor {
185 fn handle(&mut self, item: Expired<ExpiryKey>, _: &mut Self::Context) {
186 let item = item.into_inner();
187 self.map
188 .get_mut(&item.scope)
189 .and_then(|scope_map| scope_map.remove(&item.key));
190 }
191}
192
193impl Handler<StoreRequest> for HashMapActor {
194 type Result = ResponseActFuture<Self, StoreResponse>;
195
196 fn handle(&mut self, msg: StoreRequest, ctx: &mut Self::Context) -> Self::Result {
197 match msg {
198 StoreRequest::Set(scope, key, value) => {
199 if self
200 .map
201 .entry(scope.clone())
202 .or_default()
203 .insert(key.clone(), value)
204 .is_some()
205 {
206 let mut exp = self.exp.clone();
208 Box::pin(
209 async move {
210 if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
211 log::error!("{}", err);
212 }
213 }
214 .into_actor(self)
215 .map(move |_, _, _| StoreResponse::Set(Ok(()))),
216 )
217 } else {
218 Box::pin(async { StoreResponse::Set(Ok(())) }.into_actor(self))
219 }
220 }
221 StoreRequest::Get(scope, key) => {
222 let val = self
223 .map
224 .get(&scope)
225 .and_then(|scope_map| scope_map.get(&key))
226 .cloned();
227 Box::pin(async move { StoreResponse::Get(Ok(val)) }.into_actor(self))
228 }
229 StoreRequest::Delete(scope, key) => {
230 if self
231 .map
232 .get_mut(&scope)
233 .and_then(|scope_map| scope_map.remove(&key))
234 .is_some()
235 {
236 let mut exp = self.exp.clone();
238 ctx.spawn(
239 async move {
240 if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
241 log::error!("{}", err);
242 }
243 }
244 .into_actor(self),
245 );
246 }
247 Box::pin(async { StoreResponse::Delete(Ok(())) }.into_actor(self))
248 }
249 StoreRequest::Contains(scope, key) => {
250 let con = self
251 .map
252 .get(&scope)
253 .map(|scope_map| scope_map.contains_key(&key))
254 .unwrap_or(false);
255 Box::pin(async move { StoreResponse::Contains(Ok(con)) }.into_actor(self))
256 }
257 }
258 }
259}
260
261impl Handler<ExpiryRequest> for HashMapActor {
262 type Result = ResponseActFuture<Self, ExpiryResponse>;
263
264 fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
265 match msg {
266 ExpiryRequest::Set(scope, key, expires_in) => {
267 if self
268 .map
269 .get(&scope)
270 .map(|scope_map| scope_map.contains_key(&key))
271 .unwrap_or(false)
272 {
273 let mut exp = self.exp.clone();
274 Box::pin(
275 async move {
276 if let Err(err) = exp
277 .insert_or_update(ExpiryKey::new(scope, key), expires_in)
278 .await
279 {
280 log::error!("{}", err);
281 }
282 }
283 .into_actor(self)
284 .map(move |_, _, _| ExpiryResponse::Set(Ok(()))),
285 )
286 } else {
287 Box::pin(async { ExpiryResponse::Set(Ok(())) }.into_actor(self))
289 }
290 }
291 ExpiryRequest::Persist(scope, key) => {
292 if self
293 .map
294 .get(&scope)
295 .map(|scope_map| scope_map.contains_key(&key))
296 .unwrap_or(false)
297 {
298 let mut exp = self.exp.clone();
299 Box::pin(
300 async move {
301 if let Err(err) = exp.remove(ExpiryKey::new(scope, key)).await {
302 log::error!("{}", err);
303 }
304 }
305 .into_actor(self)
306 .map(move |_, _, _| ExpiryResponse::Persist(Ok(()))),
307 )
308 } else {
309 Box::pin(async { ExpiryResponse::Persist(Ok(())) }.into_actor(self))
310 }
311 }
312 ExpiryRequest::Get(scope, key) => {
313 let mut exp = self.exp.clone();
314 Box::pin(
315 async move {
316 match exp.get(ExpiryKey::new(scope, key)).await {
317 Ok(val) => val,
318 Err(err) => {
319 log::error!("{}", err);
320 None
321 }
322 }
323 }
324 .into_actor(self)
325 .map(|val, _, _| ExpiryResponse::Get(Ok(val))),
326 )
327 }
328 ExpiryRequest::Extend(scope, key, duration) => {
329 let mut exp = self.exp.clone();
330 Box::pin(
331 async move { exp.extend(ExpiryKey::new(scope, key), duration).await }
332 .into_actor(self)
333 .map(|_, _, _| ExpiryResponse::Extend(Ok(()))),
334 )
335 }
336 }
337 }
338}
339
340impl Handler<ExpiryStoreRequest> for HashMapActor {
341 type Result = ResponseActFuture<Self, ExpiryStoreResponse>;
342
343 fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
344 match msg {
345 ExpiryStoreRequest::SetExpiring(scope, key, value, expires_in) => {
346 self.map
347 .entry(scope.clone())
348 .or_default()
349 .insert(key.clone(), value);
350 let mut exp = self.exp.clone();
351 Box::pin(
352 async move {
353 exp.insert_or_update(ExpiryKey::new(scope, key), expires_in)
354 .await
355 }
356 .into_actor(self)
357 .map(move |_, _, _| ExpiryStoreResponse::SetExpiring(Ok(()))),
358 )
359 }
360 ExpiryStoreRequest::GetExpiring(scope, key) => {
361 let val = self
362 .map
363 .get(&scope)
364 .and_then(|scope_map| scope_map.get(&key))
365 .cloned();
366 if let Some(val) = val {
367 let mut exp = self.exp.clone();
368 Box::pin(
369 async move {
370 match exp.get(ExpiryKey::new(scope, key)).await {
371 Ok(val) => val,
372 Err(err) => {
373 log::error!("{}", err);
374 None
375 }
376 }
377 }
378 .into_actor(self)
379 .map(|expiry, _, _| {
380 ExpiryStoreResponse::GetExpiring(Ok(Some((val, expiry))))
381 }),
382 )
383 } else {
384 Box::pin(async { ExpiryStoreResponse::GetExpiring(Ok(None)) }.into_actor(self))
385 }
386 }
387 }
388 }
389}
390
391#[cfg(test)]
392mod test {
393 use super::*;
394 use actix_storage::tests::*;
395
396 #[test]
397 fn test_hashmap_store() {
398 test_store(Box::pin(async { HashMapActor::start_default() }));
399 }
400
401 #[test]
402 fn test_hashmap_expiry() {
403 test_expiry(
404 Box::pin(async {
405 let store = HashMapActor::start_default();
406 (store.clone(), store)
407 }),
408 2,
409 );
410 }
411
412 #[test]
413 fn test_hashmap_expiry_store() {
414 test_expiry_store(Box::pin(async { HashMapActor::start_default() }), 2);
415 }
416
417 #[test]
418 fn test_hashmap_formats() {
419 test_all_formats(Box::pin(async { HashMapActor::start_default() }));
420 }
421}