celp_sdk/cache/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
//! Provides an interface to read from / update the CELP cache
//!
//! This include functionality to add / delete fields to / from the cache.

use std::collections::HashMap;
use std::os::unix::net::UnixStream;

use prost::Message;
use redis::{Client, Commands};
use thiserror::Error;

use crate::protobuf::se::AppInfo;
use crate::warning;

const CACHE_ENDPOINT: &str = "/run/celp/redis.sock";
const CACHE_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";

const APP_INFO_HASH_KEY: &str = "celp:app_info";

#[derive(Error, Debug)]
pub enum CacheError {
    #[error(transparent)]
    RedisError(#[from] redis::RedisError),
    #[error("constraint failed: {0}")]
    ContstraintFailed(String),
}

// Define a struct to store the Redis connection
pub struct Cache {
    connection: redis::Connection,
}

impl Cache {
    /// Creates a new Cache instance.
    ///
    /// This function establishes a connection to a Redis server and returns a Result
    /// containing the Cache instance if successful, or a Boxed error if an error occurs.
    ///
    /// # Errors
    ///
    /// Returns a Boxed error if:
    ///
    /// - The URL for the Redis server is invalid or unreachable.
    /// - The connection to the Redis server cannot be established.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    ///     use celp_sdk::cache::Cache;
    ///
    ///     let cache = Cache::new();
    /// ```
    pub fn new() -> Result<Self, CacheError> {
        let client = match UnixStream::connect(CACHE_ENDPOINT) {
            Ok(_) => Client::open(format!("unix:{}", CACHE_ENDPOINT))?,
            Err(e) => {
                warning!(
                    "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
                    CACHE_ENDPOINT,
                    CACHE_FALLBACK_ENDPOINT
                );
                Client::open(CACHE_FALLBACK_ENDPOINT)?
            }
        };
        let connection = client.get_connection()?;

        Ok(Self { connection })
    }

    /// Adds a field and its corresponding value to a Redis hash.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    /// * `field` - The field to set.
    /// * `value` - The value to set for the field.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     c.add_field("celp:app_info", "restful-svc", "serialized-version-str".as_bytes());
    /// }
    /// ```
    pub fn add_field(&mut self, key: &str, field: &str, value: &[u8]) -> Result<i32, CacheError> {
        let result: i32 = self.connection.hset(key, field, value)?;
        //  The function returns the number of added or updated fields. result is 1 for addition, 0 for an update,
        //  result>1 indicates unexpected additional fields added or updated.
        if result > 1 {
            return Err(CacheError::ContstraintFailed(format!(
                "Failed to set value for key : '{}',  field : '{}'",
                key, field
            )));
        }

        Ok(result)
    }

    /// Retrieves all fields and their corresponding values for a key from a Redis hash.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     let data = c.get_fields("celp:app_info");
    /// }
    /// ```
    pub fn get_fields(&mut self, key: &str) -> Result<HashMap<String, String>, CacheError> {
        Ok(self.connection.hgetall(key)?)
    }

    /// Retrieves all field names from a Redis hash for a given key.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     let field_names = c.get_field_names("celp:app_info");
    /// }
    /// ```
    pub fn get_field_names(&mut self, key: &str) -> Result<Vec<String>, CacheError> {
        Ok(self.connection.hkeys(key)?)
    }

    /// Retrieves a specific field's value from a Redis hash.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    /// * `field` - The field whose value you want to retrieve.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     let value = c.get_field_value("celp:app_info", "field_name");
    /// }
    /// ```
    pub fn get_field_value(
        &mut self,
        key: &str,
        field: &str,
    ) -> Result<Option<String>, CacheError> {
        let result = self.connection.hget(key, field)?;
        Ok(result)
    }

    /// Checks if a specific field exists in a Redis hash.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    /// * `field` - The field you want to check for existence.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     let exists = c.field_exists("celp:app_info", "field_name");
    /// }
    /// ```
    pub fn field_exists(&mut self, key: &str, field: &str) -> Result<bool, CacheError> {
        let result: bool = self.connection.hexists(key, field)?;
        Ok(result)
    }

    /// Deletes a specific field from a Redis hash.
    ///
    /// # Arguments
    ///
    /// * `key` - The key of the Redis hash.
    /// * `field` - The field you want to delete.
    ///
    /// # Example
    ///
    /// ```rust,no_run
    /// use celp_sdk::cache::Cache;
    ///
    /// let cache = Cache::new();
    /// if let Ok(mut c) = cache {
    ///     let deleted = c.delete_field("celp:app_info", "field_name");
    /// }
    /// ```
    pub fn delete_field(&mut self, key: &str, field: &str) -> Result<bool, CacheError> {
        let deleted_count: i32 = self.connection.hdel(key, &[field])?;
        Ok(deleted_count > 0)
    }
}

/// Publish and the application information as SystemEvent to the broker, returns an error on failure
/// Also, caches the application information, returns an error on failure
/// # Arguments
///
/// * `app_info` - The application info
///
/// # Examples
/// ```rust,no_run
/// use celp_sdk::util::celp_app::build_app_info;
/// use celp_sdk::cache::publish_app_info;
///
/// let app_info = build_app_info("1.0.0").unwrap();
/// if let Err(e) = publish_app_info(app_info) {
///     eprintln!("unable to publish or cache app info: {e:#?}");
/// }
/// ```
pub fn publish_app_info(app_info: AppInfo) -> Result<(), CacheError> {
    let buf = app_info.encode_to_vec();

    // Adds a field and its corresponding value to a Redis hash
    let mut cache = Cache::new()?;
    cache.add_field(APP_INFO_HASH_KEY, &app_info.app_name, &buf)?;

    Ok(())
}