Skip to main content

ruststream_fred/testing/
publisher.rs

1//! [`RedisTestPublisher`]: `Publisher` + `TransactionalPublisher` on top of the in-memory router.
2
3use std::sync::{Arc, Mutex};
4
5use bytes::Bytes;
6use ruststream::{Headers, OutgoingMessage, Publisher, TransactionalPublisher};
7
8use crate::{
9    error::RedisError,
10    testing::broker::{TestBrokerState, validate_publish_key},
11};
12
13/// One buffered publish (key, payload, headers), held while a transaction is open.
14type Buffered = (String, Bytes, Headers);
15
16/// Publisher returned by [`crate::testing::RedisTestBroker::publisher`].
17///
18/// Mirrors the real publisher's transaction surface: messages published inside a transaction are
19/// buffered and only fan out on [`commit`](TransactionalPublisher::commit) (in publish order), or
20/// are discarded on [`abort`](TransactionalPublisher::abort).
21#[derive(Clone)]
22pub struct RedisTestPublisher {
23    state: Arc<TestBrokerState>,
24    txn: Arc<Mutex<Option<Vec<Buffered>>>>,
25}
26
27impl std::fmt::Debug for RedisTestPublisher {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        f.debug_struct("RedisTestPublisher").finish_non_exhaustive()
30    }
31}
32
33impl RedisTestPublisher {
34    pub(crate) fn new(state: Arc<TestBrokerState>) -> Self {
35        Self {
36            state,
37            txn: Arc::new(Mutex::new(None)),
38        }
39    }
40
41    fn buffer_if_in_txn(&self, entry: &Buffered) -> bool {
42        let mut guard = self
43            .txn
44            .lock()
45            .expect("redis test publisher mutex poisoned");
46        let buffered = guard.as_mut().is_some_and(|buffer| {
47            buffer.push(entry.clone());
48            true
49        });
50        drop(guard);
51        buffered
52    }
53}
54
55impl Publisher for RedisTestPublisher {
56    type Error = RedisError;
57
58    async fn publish(&self, msg: OutgoingMessage<'_>) -> Result<(), Self::Error> {
59        validate_publish_key(msg.name())?;
60        let entry: Buffered = (
61            msg.name().to_owned(),
62            Bytes::copy_from_slice(msg.payload()),
63            msg.headers().clone(),
64        );
65        if self.buffer_if_in_txn(&entry) {
66            return Ok(());
67        }
68        let (key, payload, headers) = entry;
69        self.state.router.publish(key, payload, headers);
70        Ok(())
71    }
72}
73
74impl TransactionalPublisher for RedisTestPublisher {
75    async fn begin_transaction(&self) -> Result<(), Self::Error> {
76        let mut guard = self
77            .txn
78            .lock()
79            .expect("redis test publisher mutex poisoned");
80        if guard.is_none() {
81            *guard = Some(Vec::new());
82        }
83        drop(guard);
84        Ok(())
85    }
86
87    async fn commit(&self) -> Result<(), Self::Error> {
88        let buffered = self
89            .txn
90            .lock()
91            .expect("redis test publisher mutex poisoned")
92            .take();
93        for (key, payload, headers) in buffered.into_iter().flatten() {
94            self.state.router.publish(key, payload, headers);
95        }
96        Ok(())
97    }
98
99    async fn abort(&self) -> Result<(), Self::Error> {
100        self.txn
101            .lock()
102            .expect("redis test publisher mutex poisoned")
103            .take();
104        Ok(())
105    }
106}