ruststream_fred/testing/
publisher.rs1use std::sync::{Arc, Mutex};
4
5use bytes::Bytes;
6use ruststream::{Headers, OutgoingMessage, Publisher, TransactionalPublisher};
7
8use crate::{
9 error::RedisError,
10 testing::broker::{TestBrokerState, validate_publish_key},
11};
12
13type Buffered = (String, Bytes, Headers);
15
16#[derive(Clone)]
22pub struct RedisTestPublisher {
23 state: Arc<TestBrokerState>,
24 txn: Arc<Mutex<Option<Vec<Buffered>>>>,
25}
26
27impl std::fmt::Debug for RedisTestPublisher {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 f.debug_struct("RedisTestPublisher").finish_non_exhaustive()
30 }
31}
32
33impl RedisTestPublisher {
34 pub(crate) fn new(state: Arc<TestBrokerState>) -> Self {
35 Self {
36 state,
37 txn: Arc::new(Mutex::new(None)),
38 }
39 }
40
41 fn buffer_if_in_txn(&self, entry: &Buffered) -> bool {
42 let mut guard = self
43 .txn
44 .lock()
45 .expect("redis test publisher mutex poisoned");
46 let buffered = guard.as_mut().is_some_and(|buffer| {
47 buffer.push(entry.clone());
48 true
49 });
50 drop(guard);
51 buffered
52 }
53}
54
55impl Publisher for RedisTestPublisher {
56 type Error = RedisError;
57
58 async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
59 validate_publish_key(msg.name())?;
60 let entry: Buffered = (
61 msg.name().to_owned(),
62 Bytes::copy_from_slice(msg.payload()),
63 msg.headers().clone(),
64 );
65 if self.buffer_if_in_txn(&entry) {
66 return Ok(());
67 }
68 let (key, payload, headers) = entry;
69 self.state.router.publish(key, payload, headers);
70 Ok(())
71 }
72}
73
74impl TransactionalPublisher for RedisTestPublisher {
75 async fn begin_transaction(&self) -> Result<(), Self::Error> {
76 let mut guard = self
77 .txn
78 .lock()
79 .expect("redis test publisher mutex poisoned");
80 if guard.is_none() {
81 *guard = Some(Vec::new());
82 }
83 drop(guard);
84 Ok(())
85 }
86
87 async fn commit(&self) -> Result<(), Self::Error> {
88 let buffered = self
89 .txn
90 .lock()
91 .expect("redis test publisher mutex poisoned")
92 .take();
93 for (key, payload, headers) in buffered.into_iter().flatten() {
94 self.state.router.publish(key, payload, headers);
95 }
96 Ok(())
97 }
98
99 async fn abort(&self) -> Result<(), Self::Error> {
100 self.txn
101 .lock()
102 .expect("redis test publisher mutex poisoned")
103 .take();
104 Ok(())
105 }
106}