celp_sdk/async_broker/
redis_broker.rs

1use std::{borrow::Borrow, os::unix::net::UnixStream};
2
3use futures::{Stream, StreamExt};
4use redis::{
5    aio::{MultiplexedConnection, PubSub as RedisPubSub},
6    AsyncCommands, Client,
7};
8
9use crate::{
10    async_cache::{AsyncCache, CacheKind, CacheRaw},
11    warning,
12};
13
14use super::{
15    interface::{IntoPublisher, IntoSubscriber, Publish, SubscribeError},
16    PublishError,
17};
18use super::{Message, Subscribe};
19
20const BROKER_ENDPOINT: &str = "/run/celp/redis.sock";
21const BROKER_FALLBACK_ENDPOINT: &str = "redis://127.0.0.1/";
22
23impl From<redis::Msg> for Message {
24    fn from(redis_message: redis::Msg) -> Message {
25        Message {
26            data: redis_message.get_payload_bytes().to_vec(),
27            topic: redis_message.get_channel_name().into(),
28        }
29    }
30}
31
32/// Publish-subscribe broker handle.
33///
34/// # Example usage
35///
36/// ```no_run
37/// use celp_sdk::async_broker::{AsyncBroker, IntoPublisher, IntoSubscriber, Subscribe};
38/// use futures::StreamExt;
39/// let broker = AsyncBroker::new().unwrap();
40/// # tokio_test::block_on(async move {
41/// let publisher = broker.clone().into_publisher().await.unwrap();
42/// tokio::spawn(async {
43///     let mut subscriber = broker.into_subscriber().await.unwrap();
44///     subscriber
45///         .subscribe("example.channel")
46///         .await
47///         .unwrap();
48///     // The `.boxed()` call can usually be omitted; it is included here due to
49///     // https://github.com/rust-lang/rust/issues/100013
50///     let mut stream = subscriber.into_message_stream().await.boxed();
51///     while let Some(msg) = stream.next().await {
52///         // handle `msg`
53///     }
54/// });
55/// // You can use `publisher.publish()` etc. to publish your own messages
56/// # });
57/// ```
58#[derive(Clone)]
59pub struct AsyncBroker {
60    client: Client,
61}
62
63impl AsyncBroker {
64    /// Create a new instance of broker with default config
65    pub fn new() -> Result<Self, SubscribeError> {
66        Ok(Self {
67            client: get_redis_client()?,
68        })
69    }
70}
71
72fn get_redis_client() -> Result<redis::Client, SubscribeError> {
73    let client = match UnixStream::connect(BROKER_ENDPOINT) {
74        Ok(_) => Client::open(format!("unix:{}", BROKER_ENDPOINT))?,
75        Err(e) => {
76            warning!(
77                "Unable to open cache connection to {}: {e:?}. Trying fallback {}",
78                BROKER_ENDPOINT,
79                BROKER_FALLBACK_ENDPOINT
80            );
81            Client::open(BROKER_FALLBACK_ENDPOINT)?
82        }
83    };
84
85    Ok(client)
86}
87
88#[async_trait::async_trait]
89impl IntoSubscriber for AsyncBroker {
90    async fn into_subscriber(self) -> Result<impl Subscribe, SubscribeError> {
91        Ok(AsyncBrokerSubscriber {
92            pub_sub: self.client.get_async_pubsub().await?,
93        })
94    }
95}
96
97#[async_trait::async_trait]
98impl IntoPublisher for AsyncBroker {
99    async fn into_publisher(self) -> Result<impl Publish, PublishError> {
100        let connection = self.client.get_multiplexed_async_connection().await?;
101
102        Ok(AsyncBrokerPublisher {
103            system_event_cache: AsyncCache::with_connection(
104                CacheKind::SystemEvent,
105                connection.clone(),
106            ),
107            connection,
108        })
109    }
110}
111
112/// Temporary object created by [`Broker`]'s implementation of [`IntoSubscriber`]
113/// used to subscribe to pubsub channels.
114pub struct AsyncBrokerSubscriber {
115    pub_sub: RedisPubSub,
116}
117
118/// Publishing endpoint returned [`Broker`]'s implementation of [`IntoPublisher`]
119/// used to publish pubsub messages.
120pub struct AsyncBrokerPublisher {
121    connection: MultiplexedConnection,
122    system_event_cache: AsyncCache,
123}
124
125#[async_trait::async_trait]
126impl Subscribe for AsyncBrokerSubscriber {
127    async fn subscribe(
128        &mut self,
129        topic: impl Borrow<str> + Send,
130    ) -> Result<&mut Self, SubscribeError> {
131        self.pub_sub.subscribe(topic.borrow()).await?;
132        Ok(self)
133    }
134
135    async fn subscribe_pattern(
136        &mut self,
137        pattern: impl Borrow<str> + Send,
138    ) -> Result<&mut Self, SubscribeError> {
139        self.pub_sub.psubscribe(pattern.borrow()).await?;
140        Ok(self)
141    }
142
143    async fn into_message_stream(self) -> impl Stream<Item = Message> + Send + Sync + Unpin {
144        self.pub_sub.into_on_message().map(|msg| msg.into())
145    }
146}
147
148#[async_trait::async_trait]
149impl Publish for AsyncBrokerPublisher {
150    async fn publish_raw(
151        &mut self,
152        topic: &str,
153        msg: &[u8],
154        store_in_cache: bool,
155    ) -> Result<(), PublishError> {
156        let _: () = self.connection.publish(topic, msg).await?;
157        if store_in_cache {
158            self.system_event_cache.insert_field(topic, msg).await?;
159        }
160
161        Ok(())
162    }
163}