celp_sdk/async_cache/
redis_cache.rs

1//! Provides an interface to read from / update the CELP cache
2//!
3//! This include functionality to add / delete fields to / from the cache.
4
5use std::collections::HashMap;
6use std::os::unix::net::UnixStream;
7
8use prost::Message;
9use redis::aio::MultiplexedConnection;
10use redis::{AsyncCommands, Client};
11
12use crate::protobuf::se::AppInfo;
13use crate::warning;
14
15use super::{CacheError, CacheKind, CacheRaw};
16
17const CACHE_ENDPOINT: &str = "/run/celp/redis.sock";
18const CACHE_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
19
20/// Handle to a cache database.
21#[derive(Clone)]
22pub struct AsyncCache {
23    key: &'static str,
24    connection: MultiplexedConnection,
25}
26
27impl AsyncCache {
28    /// Create new cache handle of a given `kind`.
29    pub async fn new(kind: CacheKind) -> Result<Self, CacheError> {
30        let connection = create_connection().await?;
31
32        Ok(Self {
33            connection,
34            key: kind.key(),
35        })
36    }
37
38    pub(crate) fn with_connection(kind: CacheKind, connection: MultiplexedConnection) -> Self {
39        Self {
40            connection,
41            key: kind.key(),
42        }
43    }
44}
45
46async fn create_connection() -> Result<MultiplexedConnection, CacheError> {
47    let client = match UnixStream::connect(CACHE_ENDPOINT) {
48        Ok(_) => Client::open(format!("unix:{}", CACHE_ENDPOINT))?,
49        Err(e) => {
50            warning!(
51                "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
52                CACHE_ENDPOINT,
53                CACHE_FALLBACK_ENDPOINT
54            );
55            Client::open(CACHE_FALLBACK_ENDPOINT)?
56        }
57    };
58    let connection = client.get_multiplexed_async_connection().await?;
59    Ok(connection)
60}
61
62/// Caches the application information, returns an error on failure
63///
64/// ```no_run
65/// use celp_sdk::{async_cache::publish_app_info, util::celp_app::build_app_info};
66///
67/// # tokio_test::block_on(async {
68/// let app_info = build_app_info("1.0.0").unwrap();
69/// publish_app_info(app_info).await.unwrap();
70/// # });
71/// ```
72pub async fn publish_app_info(app_info: AppInfo) -> Result<(), CacheError> {
73    let buf = app_info.encode_to_vec();
74
75    let mut cache = AsyncCache::new(CacheKind::AppInfo).await?;
76    cache.insert_field(&app_info.app_name, &buf).await?;
77
78    Ok(())
79}
80
81#[async_trait::async_trait]
82impl CacheRaw for AsyncCache {
83    async fn get_field(&mut self, field: &str) -> Result<Option<Vec<u8>>, CacheError> {
84        Ok(self.connection.hget(self.key, field).await?)
85    }
86
87    async fn get_fields(&mut self) -> Result<HashMap<String, Vec<u8>>, CacheError> {
88        Ok(self.connection.hgetall(self.key).await?)
89    }
90
91    async fn insert_field(&mut self, field: &str, value: &[u8]) -> Result<bool, CacheError> {
92        let result: i32 = self.connection.hset(self.key, field, value).await?;
93        // The function returns the number of added or updated fields.
94        // result is 1 for addition, 0 for an update, result>1 indicates
95        // unexpected additional fields added or updated.
96        let added = result > 1;
97        if added {
98            return Err(CacheError::ConstraintFailed(format!(
99                "Failed to set value for key : '{}',  field : '{}'",
100                self.key, field
101            )));
102        }
103
104        Ok(added)
105    }
106
107    async fn field_exists(&mut self, field: &str) -> Result<bool, CacheError> {
108        Ok(self.connection.hexists(self.key, field).await?)
109    }
110
111    async fn delete_field(&mut self, field: &str) -> Result<bool, CacheError> {
112        let deleted_count: i32 = self.connection.hdel(self.key, &[field]).await?;
113        Ok(deleted_count > 0)
114    }
115}