celp_sdk/async_broker/
interface.rs1use futures::Stream;
2use std::{borrow::Borrow, result::Result};
3
4use crate::{
5 async_cache,
6 protobuf::{
7 build_system_event_source, pack_system_event, system_event::Severity as ProstSeverity,
8 },
9};
10
11pub struct Message {
13 pub topic: String,
17
18 pub data: Vec<u8>,
20}
21
22#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
24pub enum Severity {
25 Info,
26 Warning,
27 Error,
28 Fatal,
29}
30
31impl From<Severity> for ProstSeverity {
32 fn from(value: Severity) -> Self {
33 match value {
34 Severity::Info => ProstSeverity::Info,
35 Severity::Warning => ProstSeverity::Warning,
36 Severity::Error => ProstSeverity::Error,
37 Severity::Fatal => ProstSeverity::Fatal,
38 }
39 }
40}
41
42#[derive(thiserror::Error, Debug)]
43pub enum SubscribeError {
44 #[error("broker error")]
45 BrokerError(#[from] redis::RedisError),
46}
47
48#[derive(thiserror::Error, Debug)]
49pub enum PublishError {
50 #[error("cache error")]
51 CacheError(#[from] async_cache::CacheError),
52 #[error("serialization error")]
53 EncodeError(#[from] prost::EncodeError),
54 #[error("broker error")]
55 BrokerError(#[from] redis::RedisError),
56}
57
58#[derive(thiserror::Error, Debug)]
59#[error(transparent)]
60pub struct DecodeError(#[from] prost::DecodeError);
61
62#[async_trait::async_trait]
66pub trait Publish: Send {
67 async fn publish_raw(
69 &mut self,
70 topic: &str,
71 msg: &[u8],
72 store_in_cache: bool,
73 ) -> Result<(), PublishError>;
74
75 async fn publish<M: prost::Message + 'static>(
77 &mut self,
78 topic: &str,
79 msg: M,
80 ) -> Result<(), PublishError> {
81 let buf = msg.encode_to_vec();
82 self.publish_raw(topic, &buf, false).await
83 }
84
85 async fn publish_event<E: prost::Message + prost::Name + 'static>(
89 &mut self,
90 event_details: E,
91 severity: Severity,
92 ) -> Result<(), PublishError> {
93 use prost::Message;
94
95 let event = pack_system_event(&event_details, severity.into())?;
96 let buf = event.encode_to_vec();
97 self.publish_raw(&event.source, &buf, true).await
98 }
99}
100
101#[async_trait::async_trait]
105pub trait Subscribe: Send {
106 async fn subscribe(
108 &mut self,
109 topic: impl Borrow<str> + Send,
110 ) -> Result<&mut Self, SubscribeError>;
111
112 async fn subscribe_pattern(
114 &mut self,
115 pattern: impl Borrow<str> + Send,
116 ) -> Result<&mut Self, SubscribeError>;
117
118 async fn into_message_stream(self) -> impl Stream<Item = Message> + Send + Sync + Unpin;
121
122 async fn subscribe_event<E: DecodeEvent>(&mut self) -> Result<&mut Self, SubscribeError> {
124 self.subscribe(build_system_event_source::<E>()).await?;
125 Ok(self)
126 }
127}
128
129pub trait DecodeEvent: Default + prost::Message + prost::Name {}
131impl<T> DecodeEvent for T where T: Default + prost::Message + prost::Name {}
132
133#[async_trait::async_trait]
135pub trait IntoSubscriber {
136 async fn into_subscriber(self) -> Result<impl Subscribe, SubscribeError>;
137}
138
139#[async_trait::async_trait]
141pub trait IntoPublisher {
142 async fn into_publisher(self) -> Result<impl Publish, PublishError>;
143}