celp_sdk/broker/redis_broker.rs
1use std::{os::unix::net::UnixStream, time::Duration};
2
3use redis::{Client, Commands};
4use thiserror::Error;
5
6use crate::{
7 cache::{Cache, CacheError},
8 protobuf::{build_system_event_source, pack_system_event, system_event::Severity},
9 warning,
10};
11
12use super::redis_pubsub::OwnedPubSub;
13use super::{Message, Publish, Subscribe};
14
15/*
16 * Constants
17 */
18
19const BROKER_CACHE_KEY: &str = "celp:broker_cache";
20const BROKER_ENDPOINT: &str = "/run/celp/redis.sock";
21const BROKER_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
22
23/*
24 * Custom Errors
25 */
26
27#[derive(Error, Debug)]
28pub enum BrokerError {
29 #[error("cache error")]
30 CacheError(#[from] CacheError),
31 #[error("unable to encode protobuf")]
32 EncodeError(#[from] prost::EncodeError),
33 #[error(transparent)]
34 RedisError(#[from] redis::RedisError),
35}
36
37/*
38 * Trait implementations
39 */
40
41impl From<redis::Msg> for Message {
42 fn from(redis_message: redis::Msg) -> Message {
43 Message {
44 data: redis_message.get_payload_bytes().to_vec(),
45 topic: redis_message.get_channel_name().into(),
46 }
47 }
48}
49
50impl Publish for Broker {
51 type Error = BrokerError;
52
53 /// Function allowing to publish a protobuf message to the CELP broker
54 /// # Arguments
55 ///
56 /// * `topic` - The topic to publish the message to
57 /// * `message` - The protobuf to be published
58 ///
59 /// # Examples
60 /// ```rust,no_run
61 /// use celp_sdk::{
62 /// protobuf::se::{AppInfo, app_info::SemVer},
63 /// broker::{Broker, Publish},
64 /// };
65 ///
66 /// let app_info = AppInfo::new("test", SemVer::default());
67 /// let mut broker = Broker::new().unwrap();
68 /// if let Err(e) = broker.publish("app.info", &app_info) {
69 /// eprintln!("error publishing: {e:?}");
70 /// }
71 /// ```
72 fn publish<M: prost::Message>(&mut self, topic: &str, message: &M) -> Result<(), Self::Error> {
73 let buf = message.encode_to_vec();
74
75 self.publish_raw(topic, &buf)
76 }
77
78 /// Function allowing to publish a protobuf message to the CELP broker while caching the latest version
79 /// # Arguments
80 ///
81 /// * `topic` - The topic to publish the message to
82 /// * `message` - The protobuf to be published
83 ///
84 /// # Examples
85 /// ```rust,no_run
86 /// use celp_sdk::{
87 /// protobuf::se::{AppInfo, app_info::SemVer},
88 /// broker::{Broker, Publish},
89 /// };
90 ///
91 /// let app_info = AppInfo::new("test", SemVer::default());
92 /// let mut broker = Broker::new().unwrap();
93 /// if let Err(e) = broker.publish_with_cache("app.info", &app_info) {
94 /// eprintln!("error publishing: {e:?}");
95 /// }
96 /// ```
97 fn publish_with_cache<M: prost::Message>(
98 &mut self,
99 topic: &str,
100 message: &M,
101 ) -> Result<(), Self::Error> {
102 let buf = message.encode_to_vec();
103
104 self.cache.add_field(BROKER_CACHE_KEY, topic, &buf)?;
105 self.publish_raw(topic, &buf)
106 }
107
108 /// Shorthand function for publishing a system event to the CELP broker while caching the latest version.
109 /// The broker will perform the required wrapping of the system event
110 /// # Arguments
111 ///
112 /// * `event_details` - the system event details to publish
113 /// * `severity` - The severity with which to send the system event
114 ///
115 /// # Examples
116 /// ```rust,no_run
117 /// use celp_sdk::{
118 /// protobuf::{se::{AppInfo, app_info::SemVer}, system_event::Severity},
119 /// broker::{Broker, Publish},
120 /// };
121 ///
122 /// let app_info = AppInfo::new("test", SemVer::default());
123 /// let mut broker = Broker::new().unwrap();
124 /// if let Err(e) = broker.publish_event(&app_info, Severity::Info) {
125 /// eprintln!("error publishing: {e:?}");
126 /// }
127 /// ```
128 fn publish_event<E: prost::Message + prost::Name>(
129 &mut self,
130 event_details: &E,
131 severity: Severity,
132 ) -> Result<(), Self::Error> {
133 let event = pack_system_event(event_details, severity)?;
134
135 self.publish_with_cache(&event.source, &event)
136 }
137
138 /// Function allowing to publish a byte array to the CELP broker
139 /// # Arguments
140 ///
141 /// * `topic` - The topic to publish the message to
142 /// * `message` - The protobuf to be published
143 ///
144 /// # Examples
145 /// ```rust,no_run
146 /// use prost::Message;
147 /// use celp_sdk::{
148 /// protobuf::se::{AppInfo, app_info::SemVer},
149 /// broker::{Broker, Publish},
150 /// };
151 ///
152 /// let app_info = AppInfo::new("test", SemVer::default());
153 /// let app_info_data = app_info.encode_to_vec();
154 /// let mut broker = Broker::new().unwrap();
155 /// if let Err(e) = broker.publish_raw("app.info", &app_info_data) {
156 /// eprintln!("error publishing: {e:?}");
157 /// }
158 /// ```
159 fn publish_raw(&mut self, topic: &str, data: &[u8]) -> Result<(), Self::Error> {
160 Ok(self.connection.publish(topic, data)?)
161 }
162}
163
164impl Subscribe for Broker {
165 type Error = BrokerError;
166
167 /// Function allowing subscription to the CELP broker for a specific topic
168 /// # Arguments
169 ///
170 /// * `topic` - The topic to subscribe to
171 ///
172 /// # Examples
173 /// ```rust,no_run
174 /// use celp_sdk::{
175 /// broker::{Broker, Subscribe},
176 /// };
177 ///
178 /// let mut broker = Broker::new().unwrap();
179 /// if let Err(e) = broker.subscribe("app.info") {
180 /// eprintln!("error subscribing: {e:?}");
181 /// }
182 /// ```
183 fn subscribe(&mut self, topic: &str) -> Result<Option<Message>, Self::Error> {
184 self.pubsub.subscribe(topic)?;
185
186 let message = match self.cache.get_field_value(BROKER_CACHE_KEY, topic) {
187 Ok(Some(val)) => Some(Message {
188 data: val.into_bytes(),
189 topic: topic.into(),
190 }),
191 Ok(_) => None,
192 Err(e) => {
193 warning!("Unable to retrieve cached data for {}: {e:?}", topic);
194 None
195 }
196 };
197
198 Ok(message)
199 }
200
201 /// Function allowing subscription to the CELP broker for a pattern
202 /// # Arguments
203 ///
204 /// * `pattern` - The pattern to subscribe to
205 ///
206 /// # Examples
207 /// ```rust,no_run
208 /// use celp_sdk::{
209 /// broker::{Broker, Subscribe},
210 /// };
211 ///
212 /// let mut broker = Broker::new().unwrap();
213 /// if let Err(e) = broker.subscribe_pattern("app.info.*") {
214 /// eprintln!("error subscribing: {e:?}");
215 /// }
216 /// ```
217 fn subscribe_pattern(&mut self, pattern: &str) -> Result<(), Self::Error> {
218 Ok(self.pubsub.psubscribe(pattern)?)
219 }
220
221 /// Function allowing subscription to the CELP broker for a specific system event
222 /// # Examples
223 /// ```rust,no_run
224 /// use celp_sdk::{
225 /// protobuf::se::AppInfo,
226 /// broker::{Broker, Subscribe},
227 /// };
228 ///
229 /// let mut broker = Broker::new().unwrap();
230 /// if let Err(e) = broker.subscribe_event::<AppInfo>() {
231 /// eprintln!("error subscribing: {e:?}");
232 /// }
233 /// ```
234 fn subscribe_event<E: prost::Message + prost::Name>(
235 &mut self,
236 ) -> Result<Option<Message>, Self::Error> {
237 self.subscribe(&build_system_event_source::<E>())
238 }
239
240 /// Function allowing unsubscription from the CELP broker for a specific topic
241 /// # Arguments
242 ///
243 /// * `topic` - The topic to unsubscribe for
244 ///
245 /// # Examples
246 /// ```rust,no_run
247 /// use celp_sdk::{
248 /// broker::{Broker, Subscribe},
249 /// };
250 ///
251 /// let mut broker = Broker::new().unwrap();
252 /// if let Err(e) = broker.unsubscribe("app.info") {
253 /// eprintln!("error unsubscribing: {e:?}");
254 /// }
255 /// ```
256 fn unsubscribe(&mut self, topic: &str) -> Result<(), Self::Error> {
257 Ok(self.pubsub.unsubscribe(topic)?)
258 }
259
260 /// Function allowing unsubscription from the CELP broker for a pattern
261 /// # Arguments
262 ///
263 /// * `pattern` - The pattern to unsubscribe for
264 ///
265 /// # Examples
266 /// ```rust,no_run
267 /// use celp_sdk::{
268 /// broker::{Broker, Subscribe},
269 /// };
270 ///
271 /// let mut broker = Broker::new().unwrap();
272 /// if let Err(e) = broker.unsubscribe_pattern("app.info.*") {
273 /// eprintln!("error unsubscribing: {e:?}");
274 /// }
275 /// ```
276 fn unsubscribe_pattern(&mut self, pattern: &str) -> Result<(), Self::Error> {
277 Ok(self.pubsub.punsubscribe(pattern)?)
278 }
279
280 /// Function allowing unsubscription from the CELP broker for a specific system event
281 /// # Examples
282 /// ```rust,no_run
283 /// use celp_sdk::{
284 /// protobuf::se::AppInfo,
285 /// broker::{Broker, Subscribe},
286 /// };
287 ///
288 /// let mut broker = Broker::new().unwrap();
289 /// if let Err(e) = broker.unsubscribe_event::<AppInfo>() {
290 /// eprintln!("error unsubscribing: {e:?}");
291 /// }
292 /// ```
293 fn unsubscribe_event<E: prost::Message + prost::Name>(&mut self) -> Result<(), Self::Error> {
294 self.unsubscribe(&build_system_event_source::<E>())
295 }
296
297 /// Function allowing to read messages from the broker
298 /// # Examples
299 /// ```rust,no_run
300 /// use std::time::Duration;
301 /// use celp_sdk::{
302 /// protobuf::{se::AppInfo, build_system_event_source, unpack_system_event_raw},
303 /// broker::{Broker, Subscribe, BrokerError},
304 /// };
305 ///
306 /// let mut broker = Broker::new().unwrap()
307 /// .set_read_timeout(Duration::from_millis(100))
308 /// .unwrap();
309 /// broker.subscribe_event::<AppInfo>().unwrap();
310 ///
311 /// let app_info_topic = build_system_event_source::<AppInfo>();
312 ///
313 /// loop {
314 /// match broker.get_message() {
315 /// Ok(message) => {
316 /// if message.topic == app_info_topic {
317 /// let app_info = unpack_system_event_raw::<AppInfo>(&message.data).unwrap();
318 /// // Handle event here
319 /// }
320 /// }
321 /// Err(BrokerError::RedisError(e)) if e.is_timeout() => continue,
322 /// Err(e) => eprintln!("Unable to receive messages from broker: {e:?}"),
323 /// }
324 /// }
325 /// ```
326 fn get_message(&mut self) -> Result<super::interface::Message, Self::Error> {
327 Ok(self.pubsub.get_message()?.into())
328 }
329}
330
331/*
332 * Broker implementation
333 */
334
335pub struct Broker {
336 cache: Cache,
337 connection: redis::Connection,
338 pubsub: OwnedPubSub,
339}
340
341impl Broker {
342 /// Create a new instance of broker
343 /// # Examples
344 /// ```rust,no_run
345 /// use celp_sdk::broker::Broker;
346 ///
347 /// let broker = Broker::new().unwrap();
348 /// ```
349 pub fn new() -> Result<Self, BrokerError> {
350 let client = Self::get_client()?;
351 let connection = client.get_connection()?;
352 let sub_connection = client.get_connection()?;
353
354 Ok(Self {
355 cache: Cache::new()?,
356 connection,
357 pubsub: OwnedPubSub::new(sub_connection),
358 })
359 }
360
361 /// Set the read timeout, i.e. the amount of time the get_message call will block before returning when no
362 /// messages can be read
363 /// # Examples
364 /// ```rust,no_run
365 /// use std::time::Duration;
366 /// use celp_sdk::broker::Broker;
367 ///
368 /// let broker = Broker::new().unwrap()
369 /// .set_read_timeout(Duration::from_millis(100))
370 /// .unwrap();
371 /// ```
372 pub fn set_read_timeout(self, dur: Duration) -> Result<Self, BrokerError> {
373 self.pubsub.set_read_timeout(Some(dur))?;
374
375 Ok(self)
376 }
377
378 fn get_client() -> Result<redis::Client, BrokerError> {
379 let client = match UnixStream::connect(BROKER_ENDPOINT) {
380 Ok(_) => Client::open(format!("unix:{}", BROKER_ENDPOINT))?,
381 Err(e) => {
382 warning!(
383 "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
384 BROKER_ENDPOINT,
385 BROKER_FALLBACK_ENDPOINT
386 );
387 Client::open(BROKER_FALLBACK_ENDPOINT)?
388 }
389 };
390
391 Ok(client)
392 }
393}