Skip to main content

ruststream_fred/testing/
broker.rs

1//! [`RedisTestBroker`]: a full `Broker` + `Subscribe` + `DescribeServer` backed by the in-process
2//! key router, which also implements [`TestableBroker`](ruststream::testing::TestableBroker) so the
3//! same type drives both the [`TestApp`](ruststream::testing::TestApp) harness and the conformance
4//! suite.
5
6use std::sync::{Arc, OnceLock};
7
8use bytes::Bytes;
9use ruststream::{
10    Broker, DescribeServer, OutgoingMessage, RawMessage, ServerSpec, Subscribe,
11    testing::{Coordinator, TestableBroker},
12};
13
14use crate::{
15    error::RedisError,
16    testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
17};
18
19/// Shared state owned by every clone of a single test broker instance.
20///
21/// Cloning [`RedisTestBroker`] clones an [`Arc`] of this; all clones see the same router and
22/// therefore the same set of subscriptions. Distinct instances (different [`RedisTestBroker::new`]
23/// calls) are fully isolated.
24#[derive(Default)]
25pub(crate) struct TestBrokerState {
26    pub(crate) router: KeyRouter,
27    /// The harness's quiescence-and-recording coordinator, installed by a
28    /// [`TestApp`](ruststream::testing::TestApp) run. Empty in production and under the conformance
29    /// suite, so fanout does no extra work.
30    coordinator: OnceLock<Coordinator>,
31}
32
33impl TestBrokerState {
34    /// Installs the harness coordinator for a [`TestApp`](ruststream::testing::TestApp) run.
35    /// Idempotent: a second install on the same broker is ignored.
36    pub(crate) fn install_coordinator(&self, coordinator: Coordinator) {
37        let _ = self.coordinator.set(coordinator);
38    }
39
40    /// A clone of the installed coordinator, threaded into each subscriber, delivery, and publish so
41    /// a requeue can re-count and a consumed delivery can decrement. `None` outside a harness run.
42    pub(crate) fn coordinator(&self) -> Option<Coordinator> {
43        self.coordinator.get().cloned()
44    }
45}
46
47impl std::fmt::Debug for TestBrokerState {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        f.debug_struct("TestBrokerState")
50            .field("router", &self.router)
51            .finish_non_exhaustive()
52    }
53}
54
55/// In-process Redis broker used for handler-level tests.
56///
57/// `publish` matches stream keys exactly (Redis Streams have no wildcard subjects) and hands the
58/// message to every matching subscriber's channel; `ack`/`nack(requeue = false)` consume the
59/// delivery and `nack(requeue = true)` re-sends it to the same subscriber's queue.
60///
61/// Broker-specific edge cases (consumer-group cursors, `XAUTOCLAIM` redelivery, idle reclaim,
62/// `MAXLEN` trimming, dead-letter routing) are intentionally NOT simulated. Use a real Redis server
63/// for those scenarios.
64#[derive(Clone, Default, Debug)]
65pub struct RedisTestBroker {
66    state: Arc<TestBrokerState>,
67}
68
69impl RedisTestBroker {
70    /// Constructs a fresh, isolated test broker. Equivalent to [`Self::default`].
71    #[must_use]
72    pub fn new() -> Self {
73        Self::default()
74    }
75
76    /// Opens a subscription on the stream `key`. Mirrors the public surface of
77    /// [`crate::RedisBroker::subscribe`]; in handler-stub mode only the key is used for routing
78    /// (no consumer-group bookkeeping).
79    ///
80    /// # Errors
81    ///
82    /// Returns [`RedisError::Subscribe`] when `key` is empty.
83    #[allow(
84        clippy::unused_async,
85        reason = "API parity with RedisBroker::subscribe"
86    )]
87    pub async fn subscribe(
88        &self,
89        key: impl Into<String>,
90    ) -> Result<RedisTestSubscriber, RedisError> {
91        let key = key.into();
92        validate_key(&key).map_err(RedisError::Subscribe)?;
93        let (id, requeue, rx) = self.state.router.subscribe(key);
94        Ok(RedisTestSubscriber::new(
95            Arc::clone(&self.state),
96            id,
97            rx,
98            requeue,
99        ))
100    }
101
102    /// Returns a publisher bound to this broker. Cheap to clone.
103    #[must_use]
104    pub fn publisher(&self) -> RedisTestPublisher {
105        RedisTestPublisher::new(Arc::clone(&self.state))
106    }
107}
108
109impl Broker for RedisTestBroker {
110    type Error = RedisError;
111
112    async fn connect(&self) -> Result<(), Self::Error> {
113        Ok(())
114    }
115
116    async fn shutdown(&self) -> Result<(), Self::Error> {
117        self.state.router.clear();
118        Ok(())
119    }
120}
121
122#[allow(clippy::use_self)]
123impl Subscribe for RedisTestBroker {
124    type Subscriber = RedisTestSubscriber;
125
126    async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
127        RedisTestBroker::subscribe(self, name).await
128    }
129}
130
131// --8<-- [start:testable]
132impl TestableBroker for RedisTestBroker {
133    fn install_coordinator(&self, coordinator: Coordinator) {
134        self.state.install_coordinator(coordinator);
135    }
136
137    fn inject(&self, message: OutgoingMessage<'_>) {
138        // Route synchronously through the broker's own fanout, bypassing subject validation: the
139        // harness injects as an external producer would, and the publish is recorded and counted
140        // like any other.
141        self.state.router.publish(
142            message.name().to_owned(),
143            Bytes::copy_from_slice(message.payload()),
144            message.headers().clone(),
145            self.state.coordinator().as_ref(),
146        );
147    }
148
149    fn published(&self, name: &str) -> Vec<RawMessage> {
150        self.state.router.published(name)
151    }
152}
153
154ruststream::register_testable_broker!(RedisTestBroker);
155// --8<-- [end:testable]
156
157type BoxError = Box<dyn std::error::Error + Send + Sync>;
158
159/// Validates that `key` is a usable stream key (non-empty).
160fn validate_key(key: &str) -> Result<(), BoxError> {
161    if key.is_empty() {
162        return Err("stream key must be non-empty".into());
163    }
164    Ok(())
165}
166
167/// Validates that `key` is publishable, converting a failure into [`RedisError::Publish`].
168pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
169    validate_key(key).map_err(RedisError::Publish)
170}
171
172impl DescribeServer for RedisTestBroker {
173    fn describe_server(&self) -> ServerSpec {
174        // The in-process broker has no real server; report itself as in-process over `redis`.
175        ServerSpec::in_process("redis")
176    }
177}