celp_sdk/async_broker/
interface.rs

1use futures::Stream;
2use std::{borrow::Borrow, result::Result};
3
4use crate::{
5    async_cache,
6    protobuf::{
7        build_system_event_source, pack_system_event, system_event::Severity as ProstSeverity,
8    },
9};
10
11/// Serialized message type.
12pub struct Message {
13    /// Pubsub topic on which this message arrived or is intended for.
14    /// Note that when subscribing to patterns, `topic` is the concrete topic
15    /// rather than the pattern it matched.
16    pub topic: String,
17
18    /// Message data.
19    pub data: Vec<u8>,
20}
21
22/// System event severity.
23#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
24pub enum Severity {
25    Info,
26    Warning,
27    Error,
28    Fatal,
29}
30
31impl From<Severity> for ProstSeverity {
32    fn from(value: Severity) -> Self {
33        match value {
34            Severity::Info => ProstSeverity::Info,
35            Severity::Warning => ProstSeverity::Warning,
36            Severity::Error => ProstSeverity::Error,
37            Severity::Fatal => ProstSeverity::Fatal,
38        }
39    }
40}
41
42#[derive(thiserror::Error, Debug)]
43pub enum SubscribeError {
44    #[error("broker error")]
45    BrokerError(#[from] redis::RedisError),
46}
47
48#[derive(thiserror::Error, Debug)]
49pub enum PublishError {
50    #[error("cache error")]
51    CacheError(#[from] async_cache::CacheError),
52    #[error("serialization error")]
53    EncodeError(#[from] prost::EncodeError),
54    #[error("broker error")]
55    BrokerError(#[from] redis::RedisError),
56}
57
58#[derive(thiserror::Error, Debug)]
59#[error(transparent)]
60pub struct DecodeError(#[from] prost::DecodeError);
61
62/// Represents the publishing end of a pub-sub broker client.
63/// `publish` and `publish_event` have some minor serialization boilerplate built-in into their
64/// default implementations.
65#[async_trait::async_trait]
66pub trait Publish: Send {
67    /// Publishes raw bytes onto the given topic, optionally storing that message in cache.
68    async fn publish_raw(
69        &mut self,
70        topic: &str,
71        msg: &[u8],
72        store_in_cache: bool,
73    ) -> Result<(), PublishError>;
74
75    /// Publishes a message with a specific type with automatic serialization.
76    async fn publish<M: prost::Message + 'static>(
77        &mut self,
78        topic: &str,
79        msg: M,
80    ) -> Result<(), PublishError> {
81        let buf = msg.encode_to_vec();
82        self.publish_raw(topic, &buf, false).await
83    }
84
85    /// Publishes a system event with given severity. System events are special messages whose
86    /// topic names are derivable from their type name, therefore this method does not accept
87    /// a topic argument.
88    async fn publish_event<E: prost::Message + prost::Name + 'static>(
89        &mut self,
90        event_details: E,
91        severity: Severity,
92    ) -> Result<(), PublishError> {
93        use prost::Message;
94
95        let event = pack_system_event(&event_details, severity.into())?;
96        let buf = event.encode_to_vec();
97        self.publish_raw(&event.source, &buf, true).await
98    }
99}
100
101/// Represents the subscribe end of a pub-sub broker client.
102/// Objects of this type are meant to accumulate subscriptions and be consumed when converting
103/// to a message stream.
104#[async_trait::async_trait]
105pub trait Subscribe: Send {
106    /// Subscribes to specific topic.
107    async fn subscribe(
108        &mut self,
109        topic: impl Borrow<str> + Send,
110    ) -> Result<&mut Self, SubscribeError>;
111
112    /// Subscribes to a pattern, i.e. multiple matching topics.
113    async fn subscribe_pattern(
114        &mut self,
115        pattern: impl Borrow<str> + Send,
116    ) -> Result<&mut Self, SubscribeError>;
117
118    /// Consumes this object, returning a [`futures::Stream`] which is ready to yield messages
119    /// arriving at previously subscribed topics or patterns.
120    async fn into_message_stream(self) -> impl Stream<Item = Message> + Send + Sync + Unpin;
121
122    /// Subscribes to system event topic.
123    async fn subscribe_event<E: DecodeEvent>(&mut self) -> Result<&mut Self, SubscribeError> {
124        self.subscribe(build_system_event_source::<E>()).await?;
125        Ok(self)
126    }
127}
128
129/// Marker trait for messages that are system events.
130pub trait DecodeEvent: Default + prost::Message + prost::Name {}
131impl<T> DecodeEvent for T where T: Default + prost::Message + prost::Name {}
132
133/// Conversion trait for objects that can be turned into pubsub subscribers.
134#[async_trait::async_trait]
135pub trait IntoSubscriber {
136    async fn into_subscriber(self) -> Result<impl Subscribe, SubscribeError>;
137}
138
139/// Conversion trait for objects that can be turned into pubsub publishers.
140#[async_trait::async_trait]
141pub trait IntoPublisher {
142    async fn into_publisher(self) -> Result<impl Publish, PublishError>;
143}