Skip to main content

ruststream_fred/testing/
publisher.rs

1//! [`RedisTestPublisher`]: `Publisher` + `TransactionalPublisher` on top of the in-memory router.
2
3use std::sync::{Arc, Mutex};
4
5use bytes::Bytes;
6use ruststream::{Headers, OutgoingMessage, Publisher, TransactionalPublisher};
7
8use crate::{
9    error::RedisError,
10    testing::broker::{TestBrokerState, validate_publish_key},
11};
12
13/// One buffered publish (key, payload, headers), held while a transaction is open.
14type Buffered = (String, Bytes, Headers);
15
16/// Publisher returned by [`crate::testing::RedisTestBroker::publisher`].
17///
18/// Mirrors the real publisher's transaction surface: messages published inside a transaction are
19/// buffered and only fan out on [`commit`](TransactionalPublisher::commit) (in publish order), or
20/// are discarded on [`abort`](TransactionalPublisher::abort).
21#[derive(Clone)]
22pub struct RedisTestPublisher {
23    state: Arc<TestBrokerState>,
24    txn: Arc<Mutex<Option<Vec<Buffered>>>>,
25}
26
27impl std::fmt::Debug for RedisTestPublisher {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("RedisTestPublisher").finish_non_exhaustive()
30    }
31}
32
33impl RedisTestPublisher {
34    pub(crate) fn new(state: Arc<TestBrokerState>) -> Self {
35        Self {
36            state,
37            txn: Arc::new(Mutex::new(None)),
38        }
39    }
40
41    fn buffer_if_in_txn(&self, entry: &Buffered) -> bool {
42        let mut guard = self
43            .txn
44            .lock()
45            .expect("redis test publisher mutex poisoned");
46        let buffered = guard.as_mut().is_some_and(|buffer| {
47            buffer.push(entry.clone());
48            true
49        });
50        drop(guard);
51        buffered
52    }
53}
54
55impl Publisher for RedisTestPublisher {
56    type Error = RedisError;
57
58    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
59        validate_publish_key(msg.name())?;
60        let entry: Buffered = (
61            msg.name().to_owned(),
62            Bytes::copy_from_slice(msg.payload()),
63            msg.headers().clone(),
64        );
65        if self.buffer_if_in_txn(&entry) {
66            return Ok(());
67        }
68        let (key, payload, headers) = entry;
69        self.state
70            .router
71            .publish(key, payload, headers, self.state.coordinator().as_ref());
72        Ok(())
73    }
74}
75
76impl TransactionalPublisher for RedisTestPublisher {
77    async fn begin_transaction(&self) -> Result<(), Self::Error> {
78        let mut guard = self
79            .txn
80            .lock()
81            .expect("redis test publisher mutex poisoned");
82        if guard.is_none() {
83            *guard = Some(Vec::new());
84        }
85        drop(guard);
86        Ok(())
87    }
88
89    async fn commit(&self) -> Result<(), Self::Error> {
90        let buffered = self
91            .txn
92            .lock()
93            .expect("redis test publisher mutex poisoned")
94            .take();
95        for (key, payload, headers) in buffered.into_iter().flatten() {
96            self.state
97                .router
98                .publish(key, payload, headers, self.state.coordinator().as_ref());
99        }
100        Ok(())
101    }
102
103    async fn abort(&self) -> Result<(), Self::Error> {
104        self.txn
105            .lock()
106            .expect("redis test publisher mutex poisoned")
107            .take();
108        Ok(())
109    }
110}