sqs_lambda/
redis_cache.rs

1use std::time::Duration;
2
3use darkredis::ConnectionPool;
4use darkredis::Error as RedisError;
5
6use log::warn;
7
8use async_trait::async_trait;
9
10use crate::cache::{Cache, CacheResponse, Cacheable};
11
12pub struct RedisCache {
13    address: String,
14    connection_pool: ConnectionPool,
15}
16
17impl Clone for RedisCache {
18    fn clone(&self) -> Self {
19        Self {
20            address: self.address.clone(),
21            connection_pool: self.connection_pool.clone(),
22        }
23    }
24}
25
26impl RedisCache {
27    pub async fn new(address: String) -> Result<Self, RedisError> {
28        let connection_pool =
29            ConnectionPool::create(address.clone(), None, num_cpus::get()).await?;
30
31        Ok(Self {
32            connection_pool,
33            address,
34        })
35    }
36}
37
38#[async_trait]
39impl Cache for RedisCache {
40    #[tracing::instrument(skip(self, cacheable))]
41    async fn get<CA>(&mut self, cacheable: CA) -> Result<CacheResponse, crate::error::Error>
42    where
43        CA: Cacheable + Send + Sync + 'static,
44    {
45        let identity = cacheable.identity();
46        let identity = hex::encode(identity);
47        //
48        let mut client = self.connection_pool.get().await;
49
50        let res = tokio::time::timeout(Duration::from_millis(200), client.exists(&identity)).await;
51
52        let res = match res {
53            Ok(res) => res,
54            Err(e) => {
55                warn!("Cache lookup failed with: {:?}", e);
56                return Ok(CacheResponse::Miss);
57            }
58        };
59
60        match res {
61            Ok(true) => Ok(CacheResponse::Hit),
62            Ok(false) => Ok(CacheResponse::Miss),
63            Err(e) => {
64                warn!("Cache lookup failed with: {:?}", e);
65                Ok(CacheResponse::Miss)
66            }
67        }
68    }
69
70    #[tracing::instrument(skip(self, identity))]
71    async fn store(&mut self, identity: Vec<u8>) -> Result<(), crate::error::Error> {
72        let identity = hex::encode(identity);
73
74        let mut client = self.connection_pool.get().await;
75
76        let res = tokio::time::timeout(
77            Duration::from_millis(200),
78            client.set_and_expire_seconds(&identity, b"1", 16 * 60),
79        )
80        .await;
81
82        res.map_err(|err| crate::error::Error::CacheError(format!("{}", err)))?
83            .map_err(|err| crate::error::Error::CacheError(format!("{}", err)))?;
84
85        Ok(())
86    }
87}