celp_sdk/async_broker/
redis_broker.rs1use std::{borrow::Borrow, os::unix::net::UnixStream};
2
3use futures::{Stream, StreamExt};
4use redis::{
5 aio::{MultiplexedConnection, PubSub as RedisPubSub},
6 AsyncCommands, Client,
7};
8
9use crate::{
10 async_cache::{AsyncCache, CacheKind, CacheRaw},
11 warning,
12};
13
14use super::{
15 interface::{IntoPublisher, IntoSubscriber, Publish, SubscribeError},
16 PublishError,
17};
18use super::{Message, Subscribe};
19
20const BROKER_ENDPOINT: &str = "/run/celp/redis.sock";
21const BROKER_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
22
23impl From<redis::Msg> for Message {
24 fn from(redis_message: redis::Msg) -> Message {
25 Message {
26 data: redis_message.get_payload_bytes().to_vec(),
27 topic: redis_message.get_channel_name().into(),
28 }
29 }
30}
31
32#[derive(Clone)]
59pub struct AsyncBroker {
60 client: Client,
61}
62
63impl AsyncBroker {
64 pub fn new() -> Result<Self, SubscribeError> {
66 Ok(Self {
67 client: get_redis_client()?,
68 })
69 }
70}
71
72fn get_redis_client() -> Result<redis::Client, SubscribeError> {
73 let client = match UnixStream::connect(BROKER_ENDPOINT) {
74 Ok(_) => Client::open(format!("unix:{}", BROKER_ENDPOINT))?,
75 Err(e) => {
76 warning!(
77 "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
78 BROKER_ENDPOINT,
79 BROKER_FALLBACK_ENDPOINT
80 );
81 Client::open(BROKER_FALLBACK_ENDPOINT)?
82 }
83 };
84
85 Ok(client)
86}
87
88#[async_trait::async_trait]
89impl IntoSubscriber for AsyncBroker {
90 async fn into_subscriber(self) -> Result<impl Subscribe, SubscribeError> {
91 Ok(AsyncBrokerSubscriber {
92 pub_sub: self.client.get_async_pubsub().await?,
93 })
94 }
95}
96
97#[async_trait::async_trait]
98impl IntoPublisher for AsyncBroker {
99 async fn into_publisher(self) -> Result<impl Publish, PublishError> {
100 let connection = self.client.get_multiplexed_async_connection().await?;
101
102 Ok(AsyncBrokerPublisher {
103 system_event_cache: AsyncCache::with_connection(
104 CacheKind::SystemEvent,
105 connection.clone(),
106 ),
107 connection,
108 })
109 }
110}
111
112pub struct AsyncBrokerSubscriber {
115 pub_sub: RedisPubSub,
116}
117
118pub struct AsyncBrokerPublisher {
121 connection: MultiplexedConnection,
122 system_event_cache: AsyncCache,
123}
124
125#[async_trait::async_trait]
126impl Subscribe for AsyncBrokerSubscriber {
127 async fn subscribe(
128 &mut self,
129 topic: impl Borrow<str> + Send,
130 ) -> Result<&mut Self, SubscribeError> {
131 self.pub_sub.subscribe(topic.borrow()).await?;
132 Ok(self)
133 }
134
135 async fn subscribe_pattern(
136 &mut self,
137 pattern: impl Borrow<str> + Send,
138 ) -> Result<&mut Self, SubscribeError> {
139 self.pub_sub.psubscribe(pattern.borrow()).await?;
140 Ok(self)
141 }
142
143 async fn into_message_stream(self) -> impl Stream<Item = Message> + Send + Sync + Unpin {
144 self.pub_sub.into_on_message().map(|msg| msg.into())
145 }
146}
147
148#[async_trait::async_trait]
149impl Publish for AsyncBrokerPublisher {
150 async fn publish_raw(
151 &mut self,
152 topic: &str,
153 msg: &[u8],
154 store_in_cache: bool,
155 ) -> Result<(), PublishError> {
156 let _: () = self.connection.publish(topic, msg).await?;
157 if store_in_cache {
158 self.system_event_cache.insert_field(topic, msg).await?;
159 }
160
161 Ok(())
162 }
163}