Skip to main content

ruststream_fred/testing/
broker.rs

1//! [`RedisTestBroker`]: `Broker` implementation backed by the in-process handler-stub dispatcher.
2
3use std::{sync::Arc, time::Duration};
4
5use ruststream::{Broker, DescribeServer, RawMessage, ServerSpec, Subscribe};
6
7use crate::{
8    error::RedisError,
9    testing::{RedisTestPublisher, RedisTestSubscriber, router::KeyRouter},
10};
11
12/// Shared state owned by every clone of a single test broker instance.
13///
14/// Cloning [`RedisTestBroker`] clones an [`Arc`] of this; all clones see the same router and
15/// therefore the same set of subscriptions. Distinct instances (different [`RedisTestBroker::new`]
16/// calls) are fully isolated.
17#[derive(Default)]
18pub(crate) struct TestBrokerState {
19    pub(crate) router: KeyRouter,
20}
21
22impl std::fmt::Debug for TestBrokerState {
23    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
24        f.debug_struct("TestBrokerState")
25            .field("router", &self.router)
26            .finish()
27    }
28}
29
30/// In-process Redis broker used for handler-level tests.
31///
32/// `publish` matches stream keys exactly (Redis Streams have no wildcard subjects) and hands the
33/// message to every matching subscriber's channel; `ack`/`nack(requeue = false)` consume the
34/// delivery and `nack(requeue = true)` re-sends it to the same subscriber's queue.
35///
36/// Broker-specific edge cases (consumer-group cursors, `XAUTOCLAIM` redelivery, idle reclaim,
37/// `MAXLEN` trimming, dead-letter routing) are intentionally NOT simulated. Use a real Redis server
38/// for those scenarios.
39#[derive(Clone, Default, Debug)]
40pub struct RedisTestBroker {
41    state: Arc<TestBrokerState>,
42}
43
44impl RedisTestBroker {
45    /// Constructs a fresh, isolated test broker. Equivalent to [`Self::default`].
46    #[must_use]
47    pub fn new() -> Self {
48        Self::default()
49    }
50
51    pub(crate) fn state(&self) -> &Arc<TestBrokerState> {
52        &self.state
53    }
54
55    /// Opens a subscription on the stream `key`. Mirrors the public surface of
56    /// [`crate::RedisBroker::subscribe`]; in handler-stub mode only the key is used for routing
57    /// (no consumer-group bookkeeping).
58    ///
59    /// # Errors
60    ///
61    /// Returns [`RedisError::Subscribe`] when `key` is empty.
62    #[allow(
63        clippy::unused_async,
64        reason = "API parity with RedisBroker::subscribe"
65    )]
66    pub async fn subscribe(
67        &self,
68        key: impl Into<String>,
69    ) -> Result<RedisTestSubscriber, RedisError> {
70        let key = key.into();
71        validate_key(&key).map_err(RedisError::Subscribe)?;
72        let (id, requeue, rx) = self.state.router.subscribe(key);
73        Ok(RedisTestSubscriber::new(
74            Arc::clone(&self.state),
75            id,
76            rx,
77            requeue,
78        ))
79    }
80
81    /// Returns a publisher bound to this broker. Cheap to clone.
82    #[must_use]
83    pub fn publisher(&self) -> RedisTestPublisher {
84        RedisTestPublisher::new(Arc::clone(&self.state))
85    }
86
87    /// Awaits until `count` messages have landed on `key` (or the timeout elapses) and returns the
88    /// recorded prefix of the published log. Returns whatever is recorded on timeout, never
89    /// blocking past it.
90    pub async fn expect_published(
91        &self,
92        key: &str,
93        count: usize,
94        timeout_dur: Duration,
95    ) -> Vec<RawMessage> {
96        self.state
97            .router
98            .expect_published(key, count, timeout_dur)
99            .await
100    }
101}
102
103impl Broker for RedisTestBroker {
104    type Error = RedisError;
105
106    async fn connect(&self) -> Result<(), Self::Error> {
107        Ok(())
108    }
109
110    async fn shutdown(&self) -> Result<(), Self::Error> {
111        self.state.router.clear();
112        Ok(())
113    }
114}
115
116#[allow(clippy::use_self)]
117impl Subscribe for RedisTestBroker {
118    type Subscriber = RedisTestSubscriber;
119
120    async fn subscribe(&self, name: &str) -> Result<Self::Subscriber, Self::Error> {
121        RedisTestBroker::subscribe(self, name).await
122    }
123}
124
125type BoxError = Box<dyn std::error::Error + Send + Sync>;
126
127/// Validates that `key` is a usable stream key (non-empty).
128fn validate_key(key: &str) -> Result<(), BoxError> {
129    if key.is_empty() {
130        return Err("stream key must be non-empty".into());
131    }
132    Ok(())
133}
134
135/// Validates that `key` is publishable, converting a failure into [`RedisError::Publish`].
136pub(crate) fn validate_publish_key(key: &str) -> Result<(), RedisError> {
137    validate_key(key).map_err(RedisError::Publish)
138}
139
140impl DescribeServer for RedisTestBroker {
141    fn describe_server(&self) -> ServerSpec {
142        // The in-process broker has no real server; report a well-known in-memory address.
143        ServerSpec::new("in-process", "redis")
144    }
145}