celp_sdk/async_cache/
mod.rs

1mod redis_cache;
2
3use std::collections::HashMap;
4
5use crate::protobuf::{build_system_event_source, unpack_system_event_raw};
6
7pub use self::redis_cache::*;
8
9#[derive(thiserror::Error, Debug)]
10pub enum CacheError {
11    #[error("broker error")]
12    BrokerError(#[from] redis::RedisError),
13    #[error("deserialization failed")]
14    DecodeError(#[from] prost::DecodeError),
15    #[error("constraint failed: {0}")]
16    ConstraintFailed(String),
17}
18
19/// Cache kind; specifies what kind of messages this cache is going to hold.
20#[derive(Debug)]
21pub enum CacheKind {
22    SystemEvent,
23    AppInfo,
24}
25
26impl CacheKind {
27    fn key(&self) -> &'static str {
28        match self {
29            CacheKind::SystemEvent => "celp:broker_cache",
30            CacheKind::AppInfo => "celp:app_info",
31        }
32    }
33}
34
35/// Exposes a "raw" cache interface, i.e. based on byte blobs rather than typed messages.
36#[async_trait::async_trait]
37pub trait CacheRaw {
38    /// Retrieves a `field`'s value from the cache.
39    async fn get_field(&mut self, field: &str) -> Result<Option<Vec<u8>>, CacheError>;
40
41    /// Retrieves all fields and their values for a given `key` from the cache.
42    async fn get_fields(&mut self) -> Result<HashMap<String, Vec<u8>>, CacheError>;
43
44    /// Adds a `field` with given `value` to the cache.
45    /// Returns `true` if an element was added,`false` if it existed before and was updated.
46    async fn insert_field(&mut self, field: &str, value: &[u8]) -> Result<bool, CacheError>;
47
48    /// Checks if a specific `field` exists in the cache.
49    async fn field_exists(&mut self, field: &str) -> Result<bool, CacheError>;
50
51    /// Deletes `field` from the cache. Returns `true` if that entry existed.
52    async fn delete_field(&mut self, field: &str) -> Result<bool, CacheError>;
53}
54
55/// Cache interface for retrieving system events with automatic deserialization.
56#[async_trait::async_trait]
57pub trait CacheSystemEvent {
58    async fn get_cached_system_event<E: prost::Message + prost::Name + Default>(
59        &mut self,
60    ) -> Result<Option<E>, CacheError>;
61}
62
63#[async_trait::async_trait]
64impl<T> CacheSystemEvent for T
65where
66    T: CacheRaw + Send,
67{
68    async fn get_cached_system_event<E: prost::Message + prost::Name + Default>(
69        &mut self,
70    ) -> Result<Option<E>, CacheError> {
71        match self.get_field(&build_system_event_source::<E>()).await? {
72            Some(data) => Ok(Some(unpack_system_event_raw::<E>(&data)?)),
73            None => Ok(None),
74        }
75    }
76}