1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
//! [hitbox::runtime::RuntimeAdapter] implementation for Actix runtime.
use actix::dev::{MessageResponse, ToEnvelope};
use actix::{Actor, Addr, Handler, Message};
use serde::de::DeserializeOwned;
use serde::Serialize;
use tracing::warn;

use hitbox::response::CacheableResponse;
use hitbox::runtime::{AdapterResult, EvictionPolicy, RuntimeAdapter, TtlSettings};
use hitbox::{CacheError, CacheState, Cacheable, CachedValue};
use hitbox_backend::{Backend, Get, Set};

use crate::QueryCache;

/// [`RuntimeAdapter`] for Actix runtime.
pub struct ActixAdapter<A, M, B>
where
    A: Actor + Handler<M>,
    M: Message + Cacheable + Send,
    M::Result: MessageResponse<A, M> + Send,
    B: Backend,
{
    message: Option<QueryCache<A, M>>,
    cache_key: String,
    cache_ttl: u32,
    cache_stale_ttl: u32,
    backend: Addr<B>,
}

impl<A, M, B> ActixAdapter<A, M, B>
where
    A: Actor + Handler<M>,
    M: Message + Cacheable + Send,
    M::Result: MessageResponse<A, M> + Send,
    B: Backend,
{
    /// Creates new instance of Actix runtime adapter.
    pub fn new(message: QueryCache<A, M>, backend: Addr<B>) -> Result<Self, CacheError> {
        let cache_key = message.cache_key()?;
        let cache_stale_ttl = message.message.cache_ttl();
        let cache_ttl = message.message.cache_ttl();
        Ok(Self {
            message: Some(message),
            backend,
            cache_key,
            cache_ttl,
            cache_stale_ttl,
        })
    }
}

impl<A, M, T, B, U> RuntimeAdapter for ActixAdapter<A, M, B>
where
    A: Actor + Handler<M>,
    A::Context: ToEnvelope<A, M>,
    M: Message<Result = T> + Cacheable + Send + 'static,
    M::Result: MessageResponse<A, M> + Send,
    B: Backend,
    <B as Actor>::Context: ToEnvelope<B, Get> + ToEnvelope<B, Set>,
    T: CacheableResponse<Cached = U> + 'static,
    U: DeserializeOwned + Serialize,
{
    type UpstreamResult = T;

    fn poll_upstream(&mut self) -> AdapterResult<Self::UpstreamResult> {
        let message = self.message.take();
        Box::pin(async move {
            let message = message.ok_or_else(|| {
                CacheError::CacheKeyGenerationError("Message already sent to upstream".to_owned())
            })?;
            Ok(message.upstream.send(message.message).await?)
        })
    }

    fn poll_cache(&self) -> AdapterResult<CacheState<Self::UpstreamResult>> {
        let backend = self.backend.clone();
        let cache_key = self.cache_key.clone();
        Box::pin(async move {
            let cached_value = backend.send(Get { key: cache_key }).await??;
            CacheState::from_bytes(cached_value.as_ref())
        })
    }

    fn update_cache(&self, cached_value: &CachedValue<Self::UpstreamResult>) -> AdapterResult<()> {
        let serialized = cached_value.serialize();
        let ttl = self.cache_ttl;
        let backend = self.backend.clone();
        let cache_key = self.cache_key.clone();
        Box::pin(async move {
            let serialized = serialized?;
            let _ = backend
                .send(Set {
                    key: cache_key,
                    value: serialized,
                    ttl: Some(ttl),
                })
                .await
                .map_err(|error| warn!("Updating Cache Error {}", error))
                .and_then(|value| value.map_err(|error| warn!("Updating Cache Error. {}", error)));
            Ok(())
        })
    }
    fn eviction_settings(&self) -> EvictionPolicy {
        let ttl_settings = TtlSettings {
            ttl: self.cache_ttl,
            stale_ttl: self.cache_stale_ttl,
        };
        EvictionPolicy::Ttl(ttl_settings)
    }
}