celp_sdk/async_cache/
redis_cache.rs1use std::collections::HashMap;
6use std::os::unix::net::UnixStream;
7
8use prost::Message;
9use redis::aio::MultiplexedConnection;
10use redis::{AsyncCommands, Client};
11
12use crate::protobuf::se::AppInfo;
13use crate::warning;
14
15use super::{CacheError, CacheKind, CacheRaw};
16
17const CACHE_ENDPOINT: &str = "/run/celp/redis.sock";
18const CACHE_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
19
20#[derive(Clone)]
22pub struct AsyncCache {
23 key: &'static str,
24 connection: MultiplexedConnection,
25}
26
27impl AsyncCache {
28 pub async fn new(kind: CacheKind) -> Result<Self, CacheError> {
30 let connection = create_connection().await?;
31
32 Ok(Self {
33 connection,
34 key: kind.key(),
35 })
36 }
37
38 pub(crate) fn with_connection(kind: CacheKind, connection: MultiplexedConnection) -> Self {
39 Self {
40 connection,
41 key: kind.key(),
42 }
43 }
44}
45
46async fn create_connection() -> Result<MultiplexedConnection, CacheError> {
47 let client = match UnixStream::connect(CACHE_ENDPOINT) {
48 Ok(_) => Client::open(format!("unix:{}", CACHE_ENDPOINT))?,
49 Err(e) => {
50 warning!(
51 "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
52 CACHE_ENDPOINT,
53 CACHE_FALLBACK_ENDPOINT
54 );
55 Client::open(CACHE_FALLBACK_ENDPOINT)?
56 }
57 };
58 let connection = client.get_multiplexed_async_connection().await?;
59 Ok(connection)
60}
61
62pub async fn publish_app_info(app_info: AppInfo) -> Result<(), CacheError> {
73 let buf = app_info.encode_to_vec();
74
75 let mut cache = AsyncCache::new(CacheKind::AppInfo).await?;
76 cache.insert_field(&app_info.app_name, &buf).await?;
77
78 Ok(())
79}
80
81#[async_trait::async_trait]
82impl CacheRaw for AsyncCache {
83 async fn get_field(&mut self, field: &str) -> Result<Option<Vec<u8>>, CacheError> {
84 Ok(self.connection.hget(self.key, field).await?)
85 }
86
87 async fn get_fields(&mut self) -> Result<HashMap<String, Vec<u8>>, CacheError> {
88 Ok(self.connection.hgetall(self.key).await?)
89 }
90
91 async fn insert_field(&mut self, field: &str, value: &[u8]) -> Result<bool, CacheError> {
92 let result: i32 = self.connection.hset(self.key, field, value).await?;
93 let added = result > 1;
97 if added {
98 return Err(CacheError::ConstraintFailed(format!(
99 "Failed to set value for key : '{}', field : '{}'",
100 self.key, field
101 )));
102 }
103
104 Ok(added)
105 }
106
107 async fn field_exists(&mut self, field: &str) -> Result<bool, CacheError> {
108 Ok(self.connection.hexists(self.key, field).await?)
109 }
110
111 async fn delete_field(&mut self, field: &str) -> Result<bool, CacheError> {
112 let deleted_count: i32 = self.connection.hdel(self.key, &[field]).await?;
113 Ok(deleted_count > 0)
114 }
115}