1use std::sync::Arc;
2use std::time::Duration;
3
4use actix::dev::{self, MessageResponse, ToEnvelope};
5use actix::{Actor, Addr, Handler, Message};
6
7use crate::dev::{Expiry, ExpiryStore, Store};
8use crate::error::{Result, StorageError};
9
10type Scope = Arc<[u8]>;
11type Key = Arc<[u8]>;
12type Value = Arc<[u8]>;
13
14#[derive(Debug, Message)]
21#[rtype(StoreResponse)]
22pub enum StoreRequest {
23 Get(Scope, Key),
24 Set(Scope, Key, Value),
25 Delete(Scope, Key),
26 Contains(Scope, Key),
27}
28
29pub enum StoreResponse {
35 Get(Result<Option<Value>>),
36 Set(Result<()>),
37 Delete(Result<()>),
38 Contains(Result<bool>),
39}
40
41impl<A: Actor> MessageResponse<A, StoreRequest> for StoreResponse {
42 fn handle(
43 self,
44 _ctx: &mut <A as Actor>::Context,
45 tx: Option<dev::OneshotSender<<StoreRequest as Message>::Result>>,
46 ) {
47 if let Some(tx) = tx {
48 let _ = tx.send(self);
49 }
50 }
51}
52
53#[async_trait::async_trait]
54impl<T> Store for Addr<T>
55where
56 T: Actor + Handler<StoreRequest> + Sync + Send,
57 T::Context: ToEnvelope<T, StoreRequest>,
58{
59 async fn set(&self, scope: Scope, key: Key, value: Value) -> Result<()> {
60 match self
61 .send(StoreRequest::Set(scope, key, value))
62 .await
63 .map_err(StorageError::custom)?
64 {
65 StoreResponse::Set(val) => val,
66 _ => panic!(),
67 }
68 }
69
70 async fn delete(&self, scope: Scope, key: Key) -> Result<()> {
71 match self
72 .send(StoreRequest::Delete(scope, key))
73 .await
74 .map_err(StorageError::custom)?
75 {
76 StoreResponse::Delete(val) => val,
77 _ => panic!(),
78 }
79 }
80
81 async fn contains_key(&self, scope: Scope, key: Key) -> Result<bool> {
82 match self
83 .send(StoreRequest::Contains(scope, key))
84 .await
85 .map_err(StorageError::custom)?
86 {
87 StoreResponse::Contains(val) => val,
88 _ => panic!(),
89 }
90 }
91
92 async fn get(&self, scope: Scope, key: Key) -> Result<Option<Value>> {
93 match self
94 .send(StoreRequest::Get(scope, key))
95 .await
96 .map_err(StorageError::custom)?
97 {
98 StoreResponse::Get(val) => val,
99 _ => panic!(),
100 }
101 }
102}
103
104#[derive(Debug, Message)]
111#[rtype(ExpiryResponse)]
112pub enum ExpiryRequest {
113 Set(Scope, Key, Duration),
114 Persist(Scope, Key),
115 Get(Scope, Key),
116 Extend(Scope, Key, Duration),
117}
118
119pub enum ExpiryResponse {
125 Set(Result<()>),
126 Persist(Result<()>),
127 Get(Result<Option<Duration>>),
128 Extend(Result<()>),
129}
130
131impl<A: Actor> MessageResponse<A, ExpiryRequest> for ExpiryResponse {
132 fn handle(
133 self,
134 _ctx: &mut <A as Actor>::Context,
135 tx: Option<dev::OneshotSender<<ExpiryRequest as Message>::Result>>,
136 ) {
137 if let Some(tx) = tx {
138 let _ = tx.send(self);
139 }
140 }
141}
142
143#[async_trait::async_trait]
144impl<T> Expiry for Addr<T>
145where
146 T: Actor + Handler<ExpiryRequest> + Sync + Send,
147 T::Context: ToEnvelope<T, ExpiryRequest>,
148{
149 async fn expire(&self, scope: Scope, key: Key, expire_in: Duration) -> Result<()> {
150 match self
151 .send(ExpiryRequest::Set(scope, key, expire_in))
152 .await
153 .map_err(StorageError::custom)?
154 {
155 ExpiryResponse::Set(val) => val,
156 _ => panic!(),
157 }
158 }
159
160 async fn persist(&self, scope: Scope, key: Key) -> Result<()> {
161 match self
162 .send(ExpiryRequest::Persist(scope, key))
163 .await
164 .map_err(StorageError::custom)?
165 {
166 ExpiryResponse::Persist(val) => val,
167 _ => panic!(),
168 }
169 }
170
171 async fn expiry(&self, scope: Scope, key: Key) -> Result<Option<Duration>> {
172 match self
173 .send(ExpiryRequest::Get(scope, key))
174 .await
175 .map_err(StorageError::custom)?
176 {
177 ExpiryResponse::Get(val) => val,
178 _ => panic!(),
179 }
180 }
181
182 async fn extend(&self, scope: Scope, key: Key, expire_in: Duration) -> Result<()> {
183 match self
184 .send(ExpiryRequest::Extend(scope, key, expire_in))
185 .await
186 .map_err(StorageError::custom)?
187 {
188 ExpiryResponse::Extend(val) => val,
189 _ => panic!(),
190 }
191 }
192}
193
194#[derive(Debug, Message)]
201#[rtype(ExpiryStoreResponse)]
202pub enum ExpiryStoreRequest {
203 SetExpiring(Scope, Key, Value, Duration),
204 GetExpiring(Scope, Key),
205}
206
207pub enum ExpiryStoreResponse {
213 SetExpiring(Result<()>),
214 GetExpiring(Result<Option<(Value, Option<Duration>)>>),
215}
216
217impl<A: Actor> MessageResponse<A, ExpiryStoreRequest> for ExpiryStoreResponse {
218 fn handle(
219 self,
220 _ctx: &mut <A as Actor>::Context,
221 tx: Option<dev::OneshotSender<<ExpiryStoreRequest as Message>::Result>>,
222 ) {
223 if let Some(tx) = tx {
224 let _ = tx.send(self);
225 }
226 }
227}
228
229#[async_trait::async_trait]
230impl<T> ExpiryStore for Addr<T>
231where
232 T: Actor
233 + Handler<ExpiryStoreRequest>
234 + Handler<ExpiryRequest>
235 + Handler<StoreRequest>
236 + Sync
237 + Send,
238 T::Context: ToEnvelope<T, ExpiryStoreRequest>
239 + ToEnvelope<T, ExpiryRequest>
240 + ToEnvelope<T, StoreRequest>,
241{
242 async fn set_expiring(
243 &self,
244 scope: Scope,
245 key: Key,
246 value: Value,
247 expire_in: Duration,
248 ) -> Result<()> {
249 match self
250 .send(ExpiryStoreRequest::SetExpiring(
251 scope, key, value, expire_in,
252 ))
253 .await
254 .map_err(StorageError::custom)?
255 {
256 ExpiryStoreResponse::SetExpiring(val) => val,
257 _ => panic!(),
258 }
259 }
260
261 async fn get_expiring(
262 &self,
263 scope: Scope,
264 key: Key,
265 ) -> Result<Option<(Value, Option<Duration>)>> {
266 match self
267 .send(ExpiryStoreRequest::GetExpiring(scope, key))
268 .await
269 .map_err(StorageError::custom)?
270 {
271 ExpiryStoreResponse::GetExpiring(val) => val,
272 _ => panic!(),
273 }
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use std::time::Duration;
280
281 use actix::Context;
282
283 use super::*;
284 use crate::dev::*;
285
286 #[derive(Default)]
287 struct TestActor;
288
289 impl Actor for TestActor {
290 type Context = Context<Self>;
291 }
292
293 impl Handler<StoreRequest> for TestActor {
294 type Result = StoreResponse;
295 fn handle(&mut self, msg: StoreRequest, _: &mut Self::Context) -> Self::Result {
296 match msg {
297 StoreRequest::Get(_, _) => StoreResponse::Get(Ok(None)),
298 StoreRequest::Set(_, _, _) => StoreResponse::Set(Ok(())),
299 StoreRequest::Delete(_, _) => StoreResponse::Get(Ok(None)),
300 StoreRequest::Contains(_, _) => StoreResponse::Contains(Ok(true)),
301 }
302 }
303 }
304
305 impl Handler<ExpiryRequest> for TestActor {
306 type Result = ExpiryResponse;
307 fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
308 match msg {
309 ExpiryRequest::Get(_, _) => ExpiryResponse::Get(Ok(None)),
310 ExpiryRequest::Set(_, _, _) => ExpiryResponse::Set(Ok(())),
311 ExpiryRequest::Persist(_, _) => ExpiryResponse::Persist(Ok(())),
312 ExpiryRequest::Extend(_, _, _) => ExpiryResponse::Extend(Ok(())),
313 }
314 }
315 }
316
317 impl Handler<ExpiryStoreRequest> for TestActor {
318 type Result = ExpiryStoreResponse;
319 fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
320 match msg {
321 ExpiryStoreRequest::SetExpiring(_, _, _, _) => {
322 ExpiryStoreResponse::SetExpiring(Ok(()))
323 }
324 ExpiryStoreRequest::GetExpiring(_, _) => ExpiryStoreResponse::GetExpiring(Ok(None)),
325 }
326 }
327 }
328
329 #[actix::test]
330 #[should_panic(expected = "explicit panic")]
331 async fn test_actor() {
332 let actor = TestActor::start_default();
333 let scope: Arc<[u8]> = "scope".as_bytes().into();
334 let key: Arc<[u8]> = "key".as_bytes().into();
335 let val: Arc<[u8]> = "val".as_bytes().into();
336 let dur = Duration::from_secs(1);
337 assert!(actor
338 .set(scope.clone(), key.clone(), val.clone())
339 .await
340 .is_ok());
341 assert!(actor.get(scope.clone(), key.clone()).await.is_ok());
342 assert!(actor.contains_key(scope.clone(), key.clone()).await.is_ok());
343 assert!(actor.expire(scope.clone(), key.clone(), dur).await.is_ok());
344 assert!(actor.expiry(scope.clone(), key.clone()).await.is_ok());
345 assert!(actor.persist(scope.clone(), key.clone()).await.is_ok());
346 assert!(actor.extend(scope.clone(), key.clone(), dur).await.is_ok());
347 assert!(actor
348 .set_expiring(scope.clone(), key.clone(), val, dur)
349 .await
350 .is_ok());
351 assert!(actor.get_expiring(scope.clone(), key.clone()).await.is_ok());
352 actor.delete(scope, key).await.unwrap();
354 }
355}