actix_storage/
actor.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use actix::dev::{self, MessageResponse, ToEnvelope};
5use actix::{Actor, Addr, Handler, Message};
6
7use crate::dev::{Expiry, ExpiryStore, Store};
8use crate::error::{Result, StorageError};
9
10type Scope = Arc<[u8]>;
11type Key = Arc<[u8]>;
12type Value = Arc<[u8]>;
13
14/// Actix message for [`Store`](../trait.Store.html) requests
15///
16/// Every store methods are mirrored to an enum variant of the same name, and should
17/// result in its corresponding variant in [`StoreResponse`](enum.StoreResponse.html).
18/// [`Store`](../trait.Store.html) is automatically implemented for actors handling
19/// this message.
20#[derive(Debug, Message)]
21#[rtype(StoreResponse)]
22pub enum StoreRequest {
23    Get(Scope, Key),
24    Set(Scope, Key, Value),
25    Delete(Scope, Key),
26    Contains(Scope, Key),
27}
28
29/// Actix message reply for [`Store`](../trait.Store.html) requests
30///
31/// Every store methods are mirrored to an enum variant of the same name, and should
32/// be a result for its corresponding variant in [`StoreResponse`](enum.StoreResponse.html)
33/// Returning anything beside the requested variant, will result in panic at runtime.
34pub enum StoreResponse {
35    Get(Result<Option<Value>>),
36    Set(Result<()>),
37    Delete(Result<()>),
38    Contains(Result<bool>),
39}
40
41impl<A: Actor> MessageResponse<A, StoreRequest> for StoreResponse {
42    fn handle(
43        self,
44        _ctx: &mut <A as Actor>::Context,
45        tx: Option<dev::OneshotSender<<StoreRequest as Message>::Result>>,
46    ) {
47        if let Some(tx) = tx {
48            let _ = tx.send(self);
49        }
50    }
51}
52
53#[async_trait::async_trait]
54impl<T> Store for Addr<T>
55where
56    T: Actor + Handler<StoreRequest> + Sync + Send,
57    T::Context: ToEnvelope<T, StoreRequest>,
58{
59    async fn set(&self, scope: Scope, key: Key, value: Value) -> Result<()> {
60        match self
61            .send(StoreRequest::Set(scope, key, value))
62            .await
63            .map_err(StorageError::custom)?
64        {
65            StoreResponse::Set(val) => val,
66            _ => panic!(),
67        }
68    }
69
70    async fn delete(&self, scope: Scope, key: Key) -> Result<()> {
71        match self
72            .send(StoreRequest::Delete(scope, key))
73            .await
74            .map_err(StorageError::custom)?
75        {
76            StoreResponse::Delete(val) => val,
77            _ => panic!(),
78        }
79    }
80
81    async fn contains_key(&self, scope: Scope, key: Key) -> Result<bool> {
82        match self
83            .send(StoreRequest::Contains(scope, key))
84            .await
85            .map_err(StorageError::custom)?
86        {
87            StoreResponse::Contains(val) => val,
88            _ => panic!(),
89        }
90    }
91
92    async fn get(&self, scope: Scope, key: Key) -> Result<Option<Value>> {
93        match self
94            .send(StoreRequest::Get(scope, key))
95            .await
96            .map_err(StorageError::custom)?
97        {
98            StoreResponse::Get(val) => val,
99            _ => panic!(),
100        }
101    }
102}
103
104/// Actix message for [`Expiry`](../trait.Expiry.html) requests
105///
106/// Every store methods are mirrored to an enum variant of the same name, and should
107/// result in its corresponding variant in [`ExpiryResponse`](enum.ExpiryResponse.html).
108/// [`Expiry`](../trait.Expiry.html) is automatically implemented for actors handling
109/// this message.
110#[derive(Debug, Message)]
111#[rtype(ExpiryResponse)]
112pub enum ExpiryRequest {
113    Set(Scope, Key, Duration),
114    Persist(Scope, Key),
115    Get(Scope, Key),
116    Extend(Scope, Key, Duration),
117}
118
119/// Actix message reply for [`Expiry`](../trait.Expiry.html) requests
120///
121/// Every store methods are mirrored to an enum variant of the same name, and should
122/// be a result for its corresponding variant in [`ExpiryRequest`](enum.ExpiryRequest.html)
123/// Returning anything beside the requested variant, will result in panic at runtime.
124pub enum ExpiryResponse {
125    Set(Result<()>),
126    Persist(Result<()>),
127    Get(Result<Option<Duration>>),
128    Extend(Result<()>),
129}
130
131impl<A: Actor> MessageResponse<A, ExpiryRequest> for ExpiryResponse {
132    fn handle(
133        self,
134        _ctx: &mut <A as Actor>::Context,
135        tx: Option<dev::OneshotSender<<ExpiryRequest as Message>::Result>>,
136    ) {
137        if let Some(tx) = tx {
138            let _ = tx.send(self);
139        }
140    }
141}
142
143#[async_trait::async_trait]
144impl<T> Expiry for Addr<T>
145where
146    T: Actor + Handler<ExpiryRequest> + Sync + Send,
147    T::Context: ToEnvelope<T, ExpiryRequest>,
148{
149    async fn expire(&self, scope: Scope, key: Key, expire_in: Duration) -> Result<()> {
150        match self
151            .send(ExpiryRequest::Set(scope, key, expire_in))
152            .await
153            .map_err(StorageError::custom)?
154        {
155            ExpiryResponse::Set(val) => val,
156            _ => panic!(),
157        }
158    }
159
160    async fn persist(&self, scope: Scope, key: Key) -> Result<()> {
161        match self
162            .send(ExpiryRequest::Persist(scope, key))
163            .await
164            .map_err(StorageError::custom)?
165        {
166            ExpiryResponse::Persist(val) => val,
167            _ => panic!(),
168        }
169    }
170
171    async fn expiry(&self, scope: Scope, key: Key) -> Result<Option<Duration>> {
172        match self
173            .send(ExpiryRequest::Get(scope, key))
174            .await
175            .map_err(StorageError::custom)?
176        {
177            ExpiryResponse::Get(val) => val,
178            _ => panic!(),
179        }
180    }
181
182    async fn extend(&self, scope: Scope, key: Key, expire_in: Duration) -> Result<()> {
183        match self
184            .send(ExpiryRequest::Extend(scope, key, expire_in))
185            .await
186            .map_err(StorageError::custom)?
187        {
188            ExpiryResponse::Extend(val) => val,
189            _ => panic!(),
190        }
191    }
192}
193
194/// Actix message for [`ExpiryStore`](../trait.ExpiryStore.html) requests
195///
196/// Every store methods are mirrored to an enum variant of the same name, and should
197/// result in its corresponding variant in [`ExpiryStoreResponse`](enum.ExpiryStoreResponse.html).
198/// [`ExpiryStore`](../trait.ExpiryStore.html) is automatically implemented for actors handling
199/// this message.
200#[derive(Debug, Message)]
201#[rtype(ExpiryStoreResponse)]
202pub enum ExpiryStoreRequest {
203    SetExpiring(Scope, Key, Value, Duration),
204    GetExpiring(Scope, Key),
205}
206
207/// Actix message reply for [`ExpiryStore`](../trait.ExpiryStore.html) requests
208///
209/// Every store methods are mirrored to an enum variant of the same name, and should
210/// be a result for its corresponding variant in [`ExpiryStoreResponse`](enum.ExpiryStoreResponse.html)
211/// Returning anything beside the requested variant, will result in panic at runtime.
212pub enum ExpiryStoreResponse {
213    SetExpiring(Result<()>),
214    GetExpiring(Result<Option<(Value, Option<Duration>)>>),
215}
216
217impl<A: Actor> MessageResponse<A, ExpiryStoreRequest> for ExpiryStoreResponse {
218    fn handle(
219        self,
220        _ctx: &mut <A as Actor>::Context,
221        tx: Option<dev::OneshotSender<<ExpiryStoreRequest as Message>::Result>>,
222    ) {
223        if let Some(tx) = tx {
224            let _ = tx.send(self);
225        }
226    }
227}
228
229#[async_trait::async_trait]
230impl<T> ExpiryStore for Addr<T>
231where
232    T: Actor
233        + Handler<ExpiryStoreRequest>
234        + Handler<ExpiryRequest>
235        + Handler<StoreRequest>
236        + Sync
237        + Send,
238    T::Context: ToEnvelope<T, ExpiryStoreRequest>
239        + ToEnvelope<T, ExpiryRequest>
240        + ToEnvelope<T, StoreRequest>,
241{
242    async fn set_expiring(
243        &self,
244        scope: Scope,
245        key: Key,
246        value: Value,
247        expire_in: Duration,
248    ) -> Result<()> {
249        match self
250            .send(ExpiryStoreRequest::SetExpiring(
251                scope, key, value, expire_in,
252            ))
253            .await
254            .map_err(StorageError::custom)?
255        {
256            ExpiryStoreResponse::SetExpiring(val) => val,
257            _ => panic!(),
258        }
259    }
260
261    async fn get_expiring(
262        &self,
263        scope: Scope,
264        key: Key,
265    ) -> Result<Option<(Value, Option<Duration>)>> {
266        match self
267            .send(ExpiryStoreRequest::GetExpiring(scope, key))
268            .await
269            .map_err(StorageError::custom)?
270        {
271            ExpiryStoreResponse::GetExpiring(val) => val,
272            _ => panic!(),
273        }
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use std::time::Duration;
280
281    use actix::Context;
282
283    use super::*;
284    use crate::dev::*;
285
286    #[derive(Default)]
287    struct TestActor;
288
289    impl Actor for TestActor {
290        type Context = Context<Self>;
291    }
292
293    impl Handler<StoreRequest> for TestActor {
294        type Result = StoreResponse;
295        fn handle(&mut self, msg: StoreRequest, _: &mut Self::Context) -> Self::Result {
296            match msg {
297                StoreRequest::Get(_, _) => StoreResponse::Get(Ok(None)),
298                StoreRequest::Set(_, _, _) => StoreResponse::Set(Ok(())),
299                StoreRequest::Delete(_, _) => StoreResponse::Get(Ok(None)),
300                StoreRequest::Contains(_, _) => StoreResponse::Contains(Ok(true)),
301            }
302        }
303    }
304
305    impl Handler<ExpiryRequest> for TestActor {
306        type Result = ExpiryResponse;
307        fn handle(&mut self, msg: ExpiryRequest, _: &mut Self::Context) -> Self::Result {
308            match msg {
309                ExpiryRequest::Get(_, _) => ExpiryResponse::Get(Ok(None)),
310                ExpiryRequest::Set(_, _, _) => ExpiryResponse::Set(Ok(())),
311                ExpiryRequest::Persist(_, _) => ExpiryResponse::Persist(Ok(())),
312                ExpiryRequest::Extend(_, _, _) => ExpiryResponse::Extend(Ok(())),
313            }
314        }
315    }
316
317    impl Handler<ExpiryStoreRequest> for TestActor {
318        type Result = ExpiryStoreResponse;
319        fn handle(&mut self, msg: ExpiryStoreRequest, _: &mut Self::Context) -> Self::Result {
320            match msg {
321                ExpiryStoreRequest::SetExpiring(_, _, _, _) => {
322                    ExpiryStoreResponse::SetExpiring(Ok(()))
323                }
324                ExpiryStoreRequest::GetExpiring(_, _) => ExpiryStoreResponse::GetExpiring(Ok(None)),
325            }
326        }
327    }
328
329    #[actix::test]
330    #[should_panic(expected = "explicit panic")]
331    async fn test_actor() {
332        let actor = TestActor::start_default();
333        let scope: Arc<[u8]> = "scope".as_bytes().into();
334        let key: Arc<[u8]> = "key".as_bytes().into();
335        let val: Arc<[u8]> = "val".as_bytes().into();
336        let dur = Duration::from_secs(1);
337        assert!(actor
338            .set(scope.clone(), key.clone(), val.clone())
339            .await
340            .is_ok());
341        assert!(actor.get(scope.clone(), key.clone()).await.is_ok());
342        assert!(actor.contains_key(scope.clone(), key.clone()).await.is_ok());
343        assert!(actor.expire(scope.clone(), key.clone(), dur).await.is_ok());
344        assert!(actor.expiry(scope.clone(), key.clone()).await.is_ok());
345        assert!(actor.persist(scope.clone(), key.clone()).await.is_ok());
346        assert!(actor.extend(scope.clone(), key.clone(), dur).await.is_ok());
347        assert!(actor
348            .set_expiring(scope.clone(), key.clone(), val, dur)
349            .await
350            .is_ok());
351        assert!(actor.get_expiring(scope.clone(), key.clone()).await.is_ok());
352        // should panic here
353        actor.delete(scope, key).await.unwrap();
354    }
355}