ruststream_fred/
publisher.rs1use std::fmt::{Debug, Formatter};
4use std::sync::{Arc, Mutex};
5
6use fred::clients::Pool;
7use fred::interfaces::StreamsInterface;
8use ruststream::{OutgoingMessage, Publisher, TransactionalPublisher};
9use tokio::sync::OnceCell;
10
11use crate::{convert::fields_for_publish, error::RedisError};
12
13type Buffered = (String, Vec<(String, Vec<u8>)>);
15
16#[derive(Clone)]
36pub struct RedisPublisher {
37 pool: Arc<OnceCell<Pool>>,
38 transactions_supported: bool,
39 txn: Arc<Mutex<Option<Vec<Buffered>>>>,
40}
41
42impl Debug for RedisPublisher {
43 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("RedisPublisher")
45 .field("transactions_supported", &self.transactions_supported)
46 .finish_non_exhaustive()
47 }
48}
49
50impl RedisPublisher {
51 pub(crate) fn new(pool: Arc<OnceCell<Pool>>, transactions_supported: bool) -> Self {
52 Self {
53 pool,
54 transactions_supported,
55 txn: Arc::new(Mutex::new(None)),
56 }
57 }
58
59 pub(crate) fn pool(&self) -> Result<Pool, RedisError> {
60 self.pool.get().cloned().ok_or(RedisError::NotConnected)
61 }
62
63 fn buffer_if_in_txn(&self, entry: &Buffered) -> bool {
66 let mut guard = self.txn.lock().expect("redis publisher mutex poisoned");
67 let buffered = guard.as_mut().is_some_and(|buffer| {
68 buffer.push(entry.clone());
69 true
70 });
71 drop(guard);
72 buffered
73 }
74}
75
76impl Publisher for RedisPublisher {
77 type Error = RedisError;
78
79 async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
80 let entry: Buffered = (
81 msg.name().to_owned(),
82 fields_for_publish(msg.payload(), msg.headers()),
83 );
84 if self.buffer_if_in_txn(&entry) {
85 return Ok(());
86 }
87 let pool = self.pool()?;
88 let (key, fields) = entry;
89 let _: String = pool
90 .xadd(key, false, None::<()>, "*", fields)
91 .await
92 .map_err(RedisError::publish)?;
93 Ok(())
94 }
95}
96
97impl TransactionalPublisher for RedisPublisher {
98 async fn begin_transaction(&self) -> Result<(), Self::Error> {
105 if !self.transactions_supported {
106 return Err(RedisError::InvalidOptions(
107 "transactions are only supported on standalone and sentinel topologies".to_owned(),
108 ));
109 }
110 let mut guard = self.txn.lock().expect("redis publisher mutex poisoned");
111 if guard.is_none() {
112 *guard = Some(Vec::new());
113 }
114 drop(guard);
115 Ok(())
116 }
117
118 async fn commit(&self) -> Result<(), Self::Error> {
126 let buffered = self
127 .txn
128 .lock()
129 .expect("redis publisher mutex poisoned")
130 .take();
131 let Some(buffered) = buffered else {
132 return Ok(());
133 };
134 if buffered.is_empty() {
135 return Ok(());
136 }
137 let pool = self.pool()?;
138 let pipeline = pool.next().pipeline();
139 for (key, fields) in buffered {
140 let _: () = pipeline
141 .xadd(key, false, None::<()>, "*", fields)
142 .await
143 .map_err(RedisError::publish)?;
144 }
145 let _: Vec<fred::types::Value> = pipeline.all().await.map_err(RedisError::publish)?;
146 Ok(())
147 }
148
149 async fn abort(&self) -> Result<(), Self::Error> {
151 self.txn
152 .lock()
153 .expect("redis publisher mutex poisoned")
154 .take();
155 Ok(())
156 }
157}