actix_storage_dashmap/
actor.rs1use std::sync::{atomic::AtomicBool, Arc};
2use std::time::{Duration, Instant};
3
4use actix::{Actor, Addr, Handler, SyncArbiter, SyncContext};
5use actix_storage::dev::actor::{
6 ExpiryRequest, ExpiryResponse, ExpiryStoreRequest, ExpiryStoreResponse, StoreRequest,
7 StoreResponse,
8};
9use dashmap::DashMap;
10use delay_queue::{Delay, DelayQueue};
11
12struct Value {
14 bytes: Arc<[u8]>,
15 timeout: Option<Instant>,
16 persist: bool,
17 nonce: usize,
19}
20
21impl Value {
22 pub fn new(bytes: Arc<[u8]>, nonce: usize) -> Self {
23 Value {
24 bytes,
25 timeout: None,
26 persist: true,
27 nonce,
28 }
29 }
30
31 pub fn new_expiring(bytes: Arc<[u8]>, nonce: usize, expires_in: Duration) -> Self {
32 Value {
33 bytes,
34 timeout: Some(Instant::now() + expires_in),
35 persist: false,
36 nonce,
37 }
38 }
39
40 pub fn expires_in(&self) -> Option<Duration> {
41 if self.persist == true {
42 None
43 } else {
44 self.timeout
45 .and_then(|timeout| timeout.checked_duration_since(Instant::now()))
46 }
47 }
48
49 pub fn set_expires_in(&mut self, expires_in: Duration) -> Instant {
50 let timeout = Instant::now() + expires_in;
51 self.persist = false;
52 self.timeout = Some(timeout);
53 self.increase_nonce();
54 timeout
55 }
56
57 pub fn extend_expires_in(&mut self, expires_in: Duration) -> Instant {
58 if let Some(timeout) = self.timeout {
59 let new_timeout = timeout + expires_in;
60 self.persist = false;
61 self.timeout = Some(new_timeout);
62 self.increase_nonce();
63 new_timeout
64 } else {
65 self.set_expires_in(expires_in)
66 }
67 }
68
69 fn increase_nonce(&mut self) {
70 self.nonce = self.nonce.checked_add(1).unwrap_or(0);
71 }
72
73 pub fn persist(&mut self) {
74 self.persist = true;
75 }
76}
77
78type ScopeMap = DashMap<Arc<[u8]>, Value>;
79type InternalMap = DashMap<Arc<[u8]>, ScopeMap>;
80type ExpiringKey = (Arc<[u8]>, Arc<[u8]>, usize);
82
83#[derive(Clone, Default)]
112pub struct DashMapActor {
113 map: Arc<InternalMap>,
114 queue: DelayQueue<Delay<ExpiringKey>>,
115
116 #[doc(hidden)]
117 stopped: Arc<AtomicBool>,
118}
119
120impl DashMapActor {
121 #[must_use = "Actor should be started to work by calling `start`"]
123 pub fn new() -> Self {
124 Self::default()
125 }
126
127 #[must_use = "Actor should be started to work by calling `start`"]
129 pub fn with_capacity(capacity: usize) -> Self {
130 Self {
131 map: DashMap::with_capacity(capacity).into(),
132 queue: DelayQueue::default(),
133 stopped: Arc::new(AtomicBool::new(false)),
134 }
135 }
136
137 pub fn start_default(threads_num: usize) -> Addr<Self> {
140 let storage = Self::default();
141 SyncArbiter::start(threads_num, move || storage.clone())
142 }
143
144 pub fn start(self, threads_num: usize) -> Addr<Self> {
146 SyncArbiter::start(threads_num, move || self.clone())
147 }
148}
149
150impl Actor for DashMapActor {
151 type Context = SyncContext<Self>;
152
153 fn started(&mut self, _: &mut Self::Context) {
154 let map = self.map.clone();
155 let mut queue = self.queue.clone();
156
157 let stopped = self.stopped.clone();
158
159 std::thread::spawn(move || loop {
160 if let Some(item) = queue.try_pop_for(Duration::from_secs(1)) {
161 let mut should_delete = false;
162 let scope = &item.value.0;
163 let key = &item.value.1;
164 let nonce = item.value.2;
165 if let Some(scope_map) = map.get_mut(scope) {
166 if let Some(value) = scope_map.get(key) {
167 if value.nonce != nonce {
168 continue;
169 }
170
171 if !value.persist {
172 should_delete = true;
173 }
174 }
175 };
176 if should_delete {
177 map.get_mut(scope)
178 .and_then(|scope_map| scope_map.remove(key));
179 }
180 } else if stopped.load(std::sync::atomic::Ordering::Relaxed) {
181 break;
182 }
183 });
184 }
185}
186
187impl Handler<StoreRequest> for DashMapActor {
188 type Result = StoreResponse;
189
190 fn handle(&mut self, msg: StoreRequest, _: &mut Self::Context) -> Self::Result {
191 match msg {
192 StoreRequest::Set(scope, key, value) => {
193 self.map
194 .entry(scope)
195 .or_default()
196 .entry(key)
197 .and_modify(|val| {
198 val.nonce += 1;
199 val.bytes = value.clone();
200 })
201 .or_insert_with(|| Value::new(value, 0));
202 StoreResponse::Set(Ok(()))
203 }
204 StoreRequest::Get(scope, key) => {
205 let value = if let Some(scope_map) = self.map.get(&scope) {
206 scope_map.get(&key).map(|val| val.bytes.clone())
207 } else {
208 None
209 };
210 StoreResponse::Get(Ok(value))
211 }
212 StoreRequest::Delete(scope, key) => {
213 self.map
214 .get_mut(&scope)
215 .and_then(|scope_map| scope_map.remove(&key));
216 StoreResponse::Delete(Ok(()))
217 }
218 StoreRequest::Contains(scope, key) => {
219 let contains = self
220 .map
221 .get(&scope)
222 .map(|scope_map| scope_map.contains_key(&key))
223 .unwrap_or(false);
224 StoreResponse::Contains(Ok(contains))
225 }
226 }
227 }
228}
229
230impl Handler<ExpiryRequest> for DashMapActor {
231 type Result = ExpiryResponse;
232
233 fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
234 match msg {
235 ExpiryRequest::Set(scope, key, expires_in) => {
236 if let Some(scope_map) = self.map.get_mut(&scope) {
237 if let Some(mut val) = scope_map.get_mut(&key) {
238 let timeout = val.set_expires_in(expires_in);
239 self.queue
240 .push(Delay::until_instant((scope, key, val.nonce), timeout));
241 }
242 }
243 ExpiryResponse::Set(Ok(()))
244 }
245 ExpiryRequest::Persist(scope, key) => {
246 if let Some(scope_map) = self.map.get_mut(&scope) {
247 if let Some(mut val) = scope_map.get_mut(&key) {
248 val.persist();
249 }
250 }
251 ExpiryResponse::Persist(Ok(()))
252 }
253 ExpiryRequest::Get(scope, key) => {
254 let item = if let Some(scope_map) = self.map.get(&scope) {
255 scope_map.get(&key).and_then(|val| val.expires_in())
256 } else {
257 None
258 };
259 ExpiryResponse::Get(Ok(item))
260 }
261 ExpiryRequest::Extend(scope, key, duration) => {
262 if let Some(scope_map) = self.map.get_mut(&scope) {
263 if let Some(mut val) = scope_map.get_mut(&key) {
264 let new_timeout = val.extend_expires_in(duration);
265 self.queue
266 .push(Delay::until_instant((scope, key, val.nonce), new_timeout));
267 }
268 }
269 ExpiryResponse::Extend(Ok(()))
270 }
271 }
272 }
273}
274
275impl Handler<ExpiryStoreRequest> for DashMapActor {
276 type Result = ExpiryStoreResponse;
277
278 fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
279 match msg {
280 ExpiryStoreRequest::SetExpiring(scope, key, value, expires_in) => {
281 let scope_map = self.map.entry(scope.clone()).or_default();
282 let val = scope_map
283 .entry(key.clone())
284 .and_modify(|val| {
285 val.nonce += 1;
286 val.bytes = value.clone();
287 val.set_expires_in(expires_in);
288 })
289 .or_insert_with(|| Value::new_expiring(value, 0, expires_in));
290 self.queue
291 .push(Delay::for_duration((scope, key, val.nonce), expires_in));
292 ExpiryStoreResponse::SetExpiring(Ok(()))
293 }
294 ExpiryStoreRequest::GetExpiring(scope, key) => {
295 let values = if let Some(scope_map) = self.map.get(&scope) {
296 scope_map
297 .get(&key)
298 .map(|val| (val.bytes.clone(), val.expires_in()))
299 } else {
300 None
301 };
302
303 ExpiryStoreResponse::GetExpiring(Ok(values))
304 }
305 }
306 }
307}
308
309#[cfg(test)]
310mod test {
311 use super::*;
312 use actix_storage::tests::*;
313
314 #[test]
315 fn test_dashmap_store() {
316 test_store(Box::pin(async { DashMapActor::default().start(1) }));
317 }
318
319 #[test]
320 fn test_dashmap_expiry() {
321 test_expiry(
322 Box::pin(async {
323 let store = DashMapActor::default().start(1);
324 (store.clone(), store)
325 }),
326 2,
327 );
328 }
329
330 #[test]
331 fn test_dashmap_expiry_store() {
332 test_expiry_store(
333 Box::pin(async {
334 let store = DashMapActor::default().start(1);
335 store
336 }),
337 2,
338 );
339 }
340
341 #[test]
342 fn test_dashmap_formats() {
343 test_all_formats(Box::pin(async {
344 let store = DashMapActor::default().start(1);
345 store
346 }));
347 }
348}