Skip to main content

ruststream_fred/testing/
subscriber.rs

1//! [`RedisTestSubscriber`] and [`RedisTestMessage`].
2//!
3//! The subscriber wraps a [`router::DeliveryReceiver`] and yields one [`RedisTestMessage`] per
4//! delivery. Dropping the subscriber unregisters its subscription from the underlying
5//! [`router::KeyRouter`], so handlers stop receiving messages as soon as their task finishes.
6
7use std::sync::{Arc, OnceLock};
8use std::task::Poll;
9
10use futures::Stream;
11use ruststream::{
12    AckError, BatchSubscriber, Headers, IncomingMessage, Partitioned, Subscriber,
13    testing::Coordinator,
14};
15
16use crate::{
17    error::RedisError,
18    testing::{
19        broker::TestBrokerState,
20        router::{Delivery, DeliveryReceiver, DeliverySender, SubscriptionId},
21    },
22};
23
24/// Subscriber returned by [`crate::testing::RedisTestBroker::subscribe`].
25pub struct RedisTestSubscriber {
26    state: Arc<TestBrokerState>,
27    id: SubscriptionId,
28    rx: DeliveryReceiver,
29    requeue: DeliverySender,
30    /// A clone of the broker's harness coordinator, threaded into each yielded message so a requeue
31    /// re-counts and a consumed delivery decrements. `None` outside a harness run.
32    coordinator: Option<Coordinator>,
33}
34
35impl std::fmt::Debug for RedisTestSubscriber {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("RedisTestSubscriber")
38            .finish_non_exhaustive()
39    }
40}
41
42impl RedisTestSubscriber {
43    pub(crate) fn new(
44        state: Arc<TestBrokerState>,
45        id: SubscriptionId,
46        rx: DeliveryReceiver,
47        requeue: DeliverySender,
48    ) -> Self {
49        // The harness installs its coordinator before any subscription opens, so reading it here
50        // captures the live coordinator for the whole subscription.
51        let coordinator = state.coordinator();
52        Self {
53            state,
54            id,
55            rx,
56            requeue,
57            coordinator,
58        }
59    }
60}
61
62impl Drop for RedisTestSubscriber {
63    fn drop(&mut self) {
64        self.state.router.unsubscribe(self.id);
65    }
66}
67
68impl Subscriber for RedisTestSubscriber {
69    type Message = RedisTestMessage;
70    type Error = RedisError;
71
72    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
73        let requeue = self.requeue.clone();
74        let coordinator = self.coordinator.clone();
75        // Poll the receiver in place rather than wrapping it in an owning stream, so `stream` can
76        // be called again after the returned stream is dropped (the runtime and the conformance
77        // helpers re-enter it per call).
78        futures::stream::poll_fn(move |cx| {
79            self.rx.poll_recv(cx).map(|next| {
80                next.map(|delivery| {
81                    Ok(RedisTestMessage::from_delivery(
82                        delivery,
83                        requeue.clone(),
84                        coordinator.clone(),
85                    ))
86                })
87            })
88        })
89    }
90}
91
92/// Message handed to handlers from a [`RedisTestSubscriber`].
93///
94/// `ack` consumes the handle silently; `nack(requeue = true)` re-queues the delivery on the owning
95/// subscription's channel so the next handler invocation sees it again (matching the republish
96/// model the real broker uses); `nack(requeue = false)` drops it.
97pub struct RedisTestMessage {
98    delivery: Option<Delivery>,
99    requeue: DeliverySender,
100    /// A clone of the broker's harness coordinator. When set, this delivery is counted in flight and
101    /// decremented exactly once when the message is consumed or dropped (see the `Drop` impl). `None`
102    /// outside a harness run.
103    coordinator: Option<Coordinator>,
104}
105
106impl Drop for RedisTestMessage {
107    /// Counts this delivery consumed exactly once: on ack, nack, or an unsettled drop (a fail-fast
108    /// panic). A requeue (`nack(true)`) re-enqueues a fresh delivery first, so the in-flight count
109    /// stays balanced across redelivery. `Drop` runs once per value, so the decrement is idempotent.
110    fn drop(&mut self) {
111        if let Some(coordinator) = &self.coordinator {
112            coordinator.consumed();
113        }
114    }
115}
116
117impl std::fmt::Debug for RedisTestMessage {
118    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119        f.debug_struct("RedisTestMessage")
120            .field(
121                "subject",
122                &self.delivery.as_ref().map(|d| d.subject.as_str()),
123            )
124            .finish_non_exhaustive()
125    }
126}
127
128impl RedisTestMessage {
129    pub(crate) fn from_delivery(
130        delivery: Delivery,
131        requeue: DeliverySender,
132        coordinator: Option<Coordinator>,
133    ) -> Self {
134        Self {
135            delivery: Some(delivery),
136            requeue,
137            coordinator,
138        }
139    }
140
141    /// Returns the stream key this message was published to.
142    #[must_use]
143    pub fn subject(&self) -> &str {
144        self.delivery
145            .as_ref()
146            .map(|d| d.subject.as_str())
147            .unwrap_or_default()
148    }
149}
150
151impl Partitioned for RedisTestMessage {
152    fn partition_key(&self) -> Option<&[u8]> {
153        self.headers().get(crate::PARTITION_KEY_HEADER)
154    }
155}
156
157impl IncomingMessage for RedisTestMessage {
158    fn payload(&self) -> &[u8] {
159        self.delivery
160            .as_ref()
161            .map(|d| d.payload.as_ref())
162            .unwrap_or_default()
163    }
164
165    fn headers(&self) -> &Headers {
166        static EMPTY: OnceLock<Headers> = OnceLock::new();
167        self.delivery
168            .as_ref()
169            .map_or_else(|| EMPTY.get_or_init(Headers::new), |d| &d.headers)
170    }
171
172    async fn ack(mut self) -> Result<(), AckError> {
173        self.delivery.take();
174        Ok(())
175    }
176
177    async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
178        let delivery = self
179            .delivery
180            .take()
181            .expect("RedisTestMessage ack/nack invoked twice");
182        if requeue {
183            // The requeue bypasses `KeyRouter::publish`, so count the re-enqueue here to balance
184            // this message's `Drop` decrement. The redelivered copy is consumed (and decremented) in
185            // turn.
186            if self.requeue.send(delivery).is_ok()
187                && let Some(coordinator) = &self.coordinator
188            {
189                coordinator.enqueued();
190            }
191        }
192        Ok(())
193    }
194}
195
196/// Max messages drained per batch on the testing subscriber (bounds one synchronous drain without
197/// blocking on more arrivals).
198const TEST_BATCH_LIMIT: usize = 256;
199
200impl BatchSubscriber for RedisTestSubscriber {
201    type Batch = Vec<RedisTestMessage>;
202
203    /// Drains whatever is already buffered in the subscriber's channel (at least one, at most
204    /// [`TEST_BATCH_LIMIT`] messages). Blocks until the first message arrives.
205    fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
206        let requeue = self.requeue.clone();
207        let coordinator = self.coordinator.clone();
208        futures::stream::poll_fn(move |cx| {
209            let first = match self.rx.poll_recv(cx) {
210                Poll::Pending => return Poll::Pending,
211                Poll::Ready(None) => return Poll::Ready(None),
212                Poll::Ready(Some(d)) => {
213                    RedisTestMessage::from_delivery(d, requeue.clone(), coordinator.clone())
214                }
215            };
216            let mut batch = vec![first];
217            while batch.len() < TEST_BATCH_LIMIT {
218                match self.rx.poll_recv(cx) {
219                    Poll::Ready(Some(d)) => {
220                        batch.push(RedisTestMessage::from_delivery(
221                            d,
222                            requeue.clone(),
223                            coordinator.clone(),
224                        ));
225                    }
226                    Poll::Ready(None) | Poll::Pending => break,
227                }
228            }
229            Poll::Ready(Some(Ok(batch)))
230        })
231    }
232}