1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
use std::fmt::Debug;
use std::time::Duration;

use darkredis::Connection;
use darkredis::ConnectionPool;
use darkredis::Error as RedisError;
use futures::compat::Future01CompatExt;
use log::warn;

use async_trait::async_trait;

use crate::cache::{Cache, Cacheable, CacheResponse};

#[derive(Clone)]
pub struct RedisCache {
    address: String,
    connection_pool: ConnectionPool,
}

impl RedisCache {
    pub async fn new(address: String) -> Result<Self, RedisError> {
        let connection_pool = ConnectionPool::create(
            address.clone(),
            None,
            num_cpus::get(),
        ).await?;

        Ok(
            Self {
                connection_pool,
                address,
            }
        )
    }
}

#[async_trait]
impl<E> Cache<E> for RedisCache
    where
        E: Debug + Clone + Send + Sync + 'static,
{
    async fn get<CA>(&mut self, cacheable: CA) -> Result<CacheResponse, crate::error::Error<E>>
        where
            CA: Cacheable + Send + Sync + 'static,
    {
        let identity = cacheable.identity();
        let identity = hex::encode(identity);
//
        let mut client = self.connection_pool.get().await;

        let res = tokio::time::timeout(
            Duration::from_millis(200),
            client.exists(&identity),
        ).await;

        let res = match res {
            Ok(res) => res,
            Err(e) => {
                warn!("Cache lookup failed with: {:?}", e);
                return Ok(CacheResponse::Miss);
            }
        };

        match res {
            Ok(true) => Ok(CacheResponse::Hit),
            Ok(false) => Ok(CacheResponse::Miss),
            Err(e) => {
                warn!("Cache lookup failed with: {:?}", e);
                Ok(CacheResponse::Miss)
            }
        }
    }

    async fn store(&mut self, identity: Vec<u8>) -> Result<(), crate::error::Error<E>>
    {
        let identity = hex::encode(identity);

        let mut client = self.connection_pool.get().await;

        let res = tokio::time::timeout(
            Duration::from_millis(200),
            client.set_and_expire_seconds(&identity, b"1", 16 * 60),
        ).await;

        res
            .map_err(|err| crate::error::Error::CacheError(format!("{}", err)))?
            .map_err(|err| crate::error::Error::CacheError(format!("{}", err)))?;


        Ok(())
    }
}