celp_sdk/broker/
redis_broker.rs

1use std::{os::unix::net::UnixStream, time::Duration};
2
3use redis::{Client, Commands};
4use thiserror::Error;
5
6use crate::{
7    cache::{Cache, CacheError},
8    protobuf::{build_system_event_source, pack_system_event, system_event::Severity},
9    warning,
10};
11
12use super::redis_pubsub::OwnedPubSub;
13use super::{Message, Publish, Subscribe};
14
15/*
16 * Constants
17 */
18
19const BROKER_CACHE_KEY: &str = "celp:broker_cache";
20const BROKER_ENDPOINT: &str = "/run/celp/redis.sock";
21const BROKER_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
22
23/*
24 * Custom Errors
25 */
26
27#[derive(Error, Debug)]
28pub enum BrokerError {
29    #[error("cache error")]
30    CacheError(#[from] CacheError),
31    #[error("unable to encode protobuf")]
32    EncodeError(#[from] prost::EncodeError),
33    #[error(transparent)]
34    RedisError(#[from] redis::RedisError),
35}
36
37/*
38 * Trait implementations
39 */
40
41impl From<redis::Msg> for Message {
42    fn from(redis_message: redis::Msg) -> Message {
43        Message {
44            data: redis_message.get_payload_bytes().to_vec(),
45            topic: redis_message.get_channel_name().into(),
46        }
47    }
48}
49
50impl Publish for Broker {
51    type Error = BrokerError;
52
53    /// Function allowing to publish a protobuf message to the CELP broker
54    /// # Arguments
55    ///
56    /// * `topic`   - The topic to publish the message to
57    /// * `message` - The protobuf to be published
58    ///
59    /// # Examples
60    /// ```rust,no_run
61    /// use celp_sdk::{
62    ///     protobuf::se::{AppInfo, app_info::SemVer},
63    ///     broker::{Broker, Publish},
64    /// };
65    ///
66    /// let app_info = AppInfo::new("test", SemVer::default());
67    /// let mut broker = Broker::new().unwrap();
68    /// if let Err(e) = broker.publish("app.info", &app_info) {
69    ///     eprintln!("error publishing: {e:?}");
70    /// }
71    /// ```
72    fn publish<M: prost::Message>(&mut self, topic: &str, message: &M) -> Result<(), Self::Error> {
73        let buf = message.encode_to_vec();
74
75        self.publish_raw(topic, &buf)
76    }
77
78    /// Function allowing to publish a protobuf message to the CELP broker while caching the latest version
79    /// # Arguments
80    ///
81    /// * `topic`   - The topic to publish the message to
82    /// * `message` - The protobuf to be published
83    ///
84    /// # Examples
85    /// ```rust,no_run
86    /// use celp_sdk::{
87    ///     protobuf::se::{AppInfo, app_info::SemVer},
88    ///     broker::{Broker, Publish},
89    /// };
90    ///
91    /// let app_info = AppInfo::new("test", SemVer::default());
92    /// let mut broker = Broker::new().unwrap();
93    /// if let Err(e) = broker.publish_with_cache("app.info", &app_info) {
94    ///     eprintln!("error publishing: {e:?}");
95    /// }
96    /// ```
97    fn publish_with_cache<M: prost::Message>(
98        &mut self,
99        topic: &str,
100        message: &M,
101    ) -> Result<(), Self::Error> {
102        let buf = message.encode_to_vec();
103
104        self.cache.add_field(BROKER_CACHE_KEY, topic, &buf)?;
105        self.publish_raw(topic, &buf)
106    }
107
108    /// Shorthand function for publishing a system event to the CELP broker while caching the latest version.
109    /// The broker will perform the required wrapping of the system event
110    /// # Arguments
111    ///
112    /// * `event_details`   - the system event details to publish
113    /// * `severity`        - The severity with which to send the system event
114    ///
115    /// # Examples
116    /// ```rust,no_run
117    /// use celp_sdk::{
118    ///     protobuf::{se::{AppInfo, app_info::SemVer}, system_event::Severity},
119    ///     broker::{Broker, Publish},
120    /// };
121    ///
122    /// let app_info = AppInfo::new("test", SemVer::default());
123    /// let mut broker = Broker::new().unwrap();
124    /// if let Err(e) = broker.publish_event(&app_info, Severity::Info) {
125    ///     eprintln!("error publishing: {e:?}");
126    /// }
127    /// ```
128    fn publish_event<E: prost::Message + prost::Name>(
129        &mut self,
130        event_details: &E,
131        severity: Severity,
132    ) -> Result<(), Self::Error> {
133        let event = pack_system_event(event_details, severity)?;
134
135        self.publish_with_cache(&event.source, &event)
136    }
137
138    /// Function allowing to publish a byte array to the CELP broker
139    /// # Arguments
140    ///
141    /// * `topic`   - The topic to publish the message to
142    /// * `message` - The protobuf to be published
143    ///
144    /// # Examples
145    /// ```rust,no_run
146    /// use prost::Message;
147    /// use celp_sdk::{
148    ///     protobuf::se::{AppInfo, app_info::SemVer},
149    ///     broker::{Broker, Publish},
150    /// };
151    ///
152    /// let app_info = AppInfo::new("test", SemVer::default());
153    /// let app_info_data = app_info.encode_to_vec();
154    /// let mut broker = Broker::new().unwrap();
155    /// if let Err(e) = broker.publish_raw("app.info", &app_info_data) {
156    ///     eprintln!("error publishing: {e:?}");
157    /// }
158    /// ```
159    fn publish_raw(&mut self, topic: &str, data: &[u8]) -> Result<(), Self::Error> {
160        Ok(self.connection.publish(topic, data)?)
161    }
162}
163
164impl Subscribe for Broker {
165    type Error = BrokerError;
166
167    /// Function allowing subscription to the CELP broker for a specific topic
168    /// # Arguments
169    ///
170    /// * `topic`   - The topic to subscribe to
171    ///
172    /// # Examples
173    /// ```rust,no_run
174    /// use celp_sdk::{
175    ///     broker::{Broker, Subscribe},
176    /// };
177    ///
178    /// let mut broker = Broker::new().unwrap();
179    /// if let Err(e) = broker.subscribe("app.info") {
180    ///     eprintln!("error subscribing: {e:?}");
181    /// }
182    /// ```
183    fn subscribe(&mut self, topic: &str) -> Result<Option<Message>, Self::Error> {
184        self.pubsub.subscribe(topic)?;
185
186        let message = match self.cache.get_field_value(BROKER_CACHE_KEY, topic) {
187            Ok(Some(val)) => Some(Message {
188                data: val.into_bytes(),
189                topic: topic.into(),
190            }),
191            Ok(_) => None,
192            Err(e) => {
193                warning!("Unable to retrieve cached data for {}: {e:?}", topic);
194                None
195            }
196        };
197
198        Ok(message)
199    }
200
201    /// Function allowing subscription to the CELP broker for a pattern
202    /// # Arguments
203    ///
204    /// * `pattern` - The pattern to subscribe to
205    ///
206    /// # Examples
207    /// ```rust,no_run
208    /// use celp_sdk::{
209    ///     broker::{Broker, Subscribe},
210    /// };
211    ///
212    /// let mut broker = Broker::new().unwrap();
213    /// if let Err(e) = broker.subscribe_pattern("app.info.*") {
214    ///     eprintln!("error subscribing: {e:?}");
215    /// }
216    /// ```
217    fn subscribe_pattern(&mut self, pattern: &str) -> Result<(), Self::Error> {
218        Ok(self.pubsub.psubscribe(pattern)?)
219    }
220
221    /// Function allowing subscription to the CELP broker for a specific system event
222    /// # Examples
223    /// ```rust,no_run
224    /// use celp_sdk::{
225    ///     protobuf::se::AppInfo,
226    ///     broker::{Broker, Subscribe},
227    /// };
228    ///
229    /// let mut broker = Broker::new().unwrap();
230    /// if let Err(e) = broker.subscribe_event::<AppInfo>() {
231    ///     eprintln!("error subscribing: {e:?}");
232    /// }
233    /// ```
234    fn subscribe_event<E: prost::Message + prost::Name>(
235        &mut self,
236    ) -> Result<Option<Message>, Self::Error> {
237        self.subscribe(&build_system_event_source::<E>())
238    }
239
240    /// Function allowing unsubscription from the CELP broker for a specific topic
241    /// # Arguments
242    ///
243    /// * `topic`   - The topic to unsubscribe for
244    ///
245    /// # Examples
246    /// ```rust,no_run
247    /// use celp_sdk::{
248    ///     broker::{Broker, Subscribe},
249    /// };
250    ///
251    /// let mut broker = Broker::new().unwrap();
252    /// if let Err(e) = broker.unsubscribe("app.info") {
253    ///     eprintln!("error unsubscribing: {e:?}");
254    /// }
255    /// ```
256    fn unsubscribe(&mut self, topic: &str) -> Result<(), Self::Error> {
257        Ok(self.pubsub.unsubscribe(topic)?)
258    }
259
260    /// Function allowing unsubscription from the CELP broker for a pattern
261    /// # Arguments
262    ///
263    /// * `pattern` - The pattern to unsubscribe for
264    ///
265    /// # Examples
266    /// ```rust,no_run
267    /// use celp_sdk::{
268    ///     broker::{Broker, Subscribe},
269    /// };
270    ///
271    /// let mut broker = Broker::new().unwrap();
272    /// if let Err(e) = broker.unsubscribe_pattern("app.info.*") {
273    ///     eprintln!("error unsubscribing: {e:?}");
274    /// }
275    /// ```
276    fn unsubscribe_pattern(&mut self, pattern: &str) -> Result<(), Self::Error> {
277        Ok(self.pubsub.punsubscribe(pattern)?)
278    }
279
280    /// Function allowing unsubscription from the CELP broker for a specific system event
281    /// # Examples
282    /// ```rust,no_run
283    /// use celp_sdk::{
284    ///     protobuf::se::AppInfo,
285    ///     broker::{Broker, Subscribe},
286    /// };
287    ///
288    /// let mut broker = Broker::new().unwrap();
289    /// if let Err(e) = broker.unsubscribe_event::<AppInfo>() {
290    ///     eprintln!("error unsubscribing: {e:?}");
291    /// }
292    /// ```
293    fn unsubscribe_event<E: prost::Message + prost::Name>(&mut self) -> Result<(), Self::Error> {
294        self.unsubscribe(&build_system_event_source::<E>())
295    }
296
297    /// Function allowing to read messages from the broker
298    /// # Examples
299    /// ```rust,no_run
300    /// use std::time::Duration;
301    /// use celp_sdk::{
302    ///     protobuf::{se::AppInfo, build_system_event_source, unpack_system_event_raw},
303    ///     broker::{Broker, Subscribe, BrokerError},
304    /// };
305    ///
306    /// let mut broker = Broker::new().unwrap()
307    ///     .set_read_timeout(Duration::from_millis(100))
308    ///     .unwrap();
309    /// broker.subscribe_event::<AppInfo>().unwrap();
310    ///
311    /// let app_info_topic = build_system_event_source::<AppInfo>();
312    ///
313    /// loop {
314    ///     match broker.get_message() {
315    ///         Ok(message) => {
316    ///             if message.topic == app_info_topic {
317    ///                 let app_info = unpack_system_event_raw::<AppInfo>(&message.data).unwrap();
318    ///                 // Handle event here
319    ///             }
320    ///         }
321    ///         Err(BrokerError::RedisError(e)) if e.is_timeout() => continue,
322    ///         Err(e) => eprintln!("Unable to receive messages from broker: {e:?}"),
323    ///     }
324    /// }
325    /// ```
326    fn get_message(&mut self) -> Result<super::interface::Message, Self::Error> {
327        Ok(self.pubsub.get_message()?.into())
328    }
329}
330
331/*
332 * Broker implementation
333 */
334
335pub struct Broker {
336    cache: Cache,
337    connection: redis::Connection,
338    pubsub: OwnedPubSub,
339}
340
341impl Broker {
342    /// Create a new instance of broker
343    /// # Examples
344    /// ```rust,no_run
345    /// use celp_sdk::broker::Broker;
346    ///
347    /// let broker = Broker::new().unwrap();
348    /// ```
349    pub fn new() -> Result<Self, BrokerError> {
350        let client = Self::get_client()?;
351        let connection = client.get_connection()?;
352        let sub_connection = client.get_connection()?;
353
354        Ok(Self {
355            cache: Cache::new()?,
356            connection,
357            pubsub: OwnedPubSub::new(sub_connection),
358        })
359    }
360
361    /// Set the read timeout, i.e. the amount of time the get_message call will block before returning when no
362    /// messages can be read
363    /// # Examples
364    /// ```rust,no_run
365    /// use std::time::Duration;
366    /// use celp_sdk::broker::Broker;
367    ///
368    /// let broker = Broker::new().unwrap()
369    ///     .set_read_timeout(Duration::from_millis(100))
370    ///     .unwrap();
371    /// ```
372    pub fn set_read_timeout(self, dur: Duration) -> Result<Self, BrokerError> {
373        self.pubsub.set_read_timeout(Some(dur))?;
374
375        Ok(self)
376    }
377
378    fn get_client() -> Result<redis::Client, BrokerError> {
379        let client = match UnixStream::connect(BROKER_ENDPOINT) {
380            Ok(_) => Client::open(format!("unix:{}", BROKER_ENDPOINT))?,
381            Err(e) => {
382                warning!(
383                    "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
384                    BROKER_ENDPOINT,
385                    BROKER_FALLBACK_ENDPOINT
386                );
387                Client::open(BROKER_FALLBACK_ENDPOINT)?
388            }
389        };
390
391        Ok(client)
392    }
393}