Skip to main content

ruststream_fred/testing/
subscriber.rs

1//! [`RedisTestSubscriber`] and [`RedisTestMessage`].
2//!
3//! The subscriber wraps a [`router::DeliveryReceiver`] and yields one [`RedisTestMessage`] per
4//! delivery. Dropping the subscriber unregisters its subscription from the underlying
5//! [`router::KeyRouter`], so handlers stop receiving messages as soon as their task finishes.
6
7use std::sync::{Arc, OnceLock};
8use std::task::Poll;
9
10use futures::Stream;
11use ruststream::{AckError, BatchSubscriber, Headers, IncomingMessage, Partitioned, Subscriber};
12
13use crate::{
14    error::RedisError,
15    testing::{
16        broker::TestBrokerState,
17        router::{Delivery, DeliveryReceiver, DeliverySender, SubscriptionId},
18    },
19};
20
21/// Subscriber returned by [`crate::testing::RedisTestBroker::subscribe`].
22pub struct RedisTestSubscriber {
23    state: Arc<TestBrokerState>,
24    id: SubscriptionId,
25    rx: DeliveryReceiver,
26    requeue: DeliverySender,
27}
28
29impl std::fmt::Debug for RedisTestSubscriber {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("RedisTestSubscriber")
32            .finish_non_exhaustive()
33    }
34}
35
36impl RedisTestSubscriber {
37    pub(crate) fn new(
38        state: Arc<TestBrokerState>,
39        id: SubscriptionId,
40        rx: DeliveryReceiver,
41        requeue: DeliverySender,
42    ) -> Self {
43        Self {
44            state,
45            id,
46            rx,
47            requeue,
48        }
49    }
50}
51
52impl Drop for RedisTestSubscriber {
53    fn drop(&mut self) {
54        self.state.router.unsubscribe(self.id);
55    }
56}
57
58impl Subscriber for RedisTestSubscriber {
59    type Message = RedisTestMessage;
60    type Error = RedisError;
61
62    fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
63        let requeue = self.requeue.clone();
64        // Poll the receiver in place rather than wrapping it in an owning stream, so `stream` can
65        // be called again after the returned stream is dropped (the runtime and the conformance
66        // helpers re-enter it per call).
67        futures::stream::poll_fn(move |cx| {
68            self.rx.poll_recv(cx).map(|next| {
69                next.map(|delivery| {
70                    Ok(RedisTestMessage {
71                        delivery: Some(delivery),
72                        requeue: requeue.clone(),
73                    })
74                })
75            })
76        })
77    }
78}
79
80/// Message handed to handlers from a [`RedisTestSubscriber`].
81///
82/// `ack` consumes the handle silently; `nack(requeue = true)` re-queues the delivery on the owning
83/// subscription's channel so the next handler invocation sees it again (matching the republish
84/// model the real broker uses); `nack(requeue = false)` drops it.
85pub struct RedisTestMessage {
86    delivery: Option<Delivery>,
87    requeue: DeliverySender,
88}
89
90impl std::fmt::Debug for RedisTestMessage {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        f.debug_struct("RedisTestMessage")
93            .field(
94                "subject",
95                &self.delivery.as_ref().map(|d| d.subject.as_str()),
96            )
97            .finish_non_exhaustive()
98    }
99}
100
101impl RedisTestMessage {
102    pub(crate) fn from_delivery(delivery: Delivery, requeue: DeliverySender) -> Self {
103        Self {
104            delivery: Some(delivery),
105            requeue,
106        }
107    }
108
109    /// Returns the stream key this message was published to.
110    #[must_use]
111    pub fn subject(&self) -> &str {
112        self.delivery
113            .as_ref()
114            .map(|d| d.subject.as_str())
115            .unwrap_or_default()
116    }
117}
118
119impl Partitioned for RedisTestMessage {
120    fn partition_key(&self) -> Option<&[u8]> {
121        self.headers().get(crate::PARTITION_KEY_HEADER)
122    }
123}
124
125impl IncomingMessage for RedisTestMessage {
126    fn payload(&self) -> &[u8] {
127        self.delivery
128            .as_ref()
129            .map(|d| d.payload.as_ref())
130            .unwrap_or_default()
131    }
132
133    fn headers(&self) -> &Headers {
134        static EMPTY: OnceLock<Headers> = OnceLock::new();
135        self.delivery
136            .as_ref()
137            .map_or_else(|| EMPTY.get_or_init(Headers::new), |d| &d.headers)
138    }
139
140    async fn ack(mut self) -> Result<(), AckError> {
141        self.delivery.take();
142        Ok(())
143    }
144
145    async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
146        let delivery = self
147            .delivery
148            .take()
149            .expect("RedisTestMessage ack/nack invoked twice");
150        if requeue {
151            let _ = self.requeue.send(delivery);
152        }
153        Ok(())
154    }
155}
156
157/// Max messages drained per batch on the testing subscriber (bounds one synchronous drain without
158/// blocking on more arrivals).
159const TEST_BATCH_LIMIT: usize = 256;
160
161impl BatchSubscriber for RedisTestSubscriber {
162    type Batch = Vec<RedisTestMessage>;
163
164    /// Drains whatever is already buffered in the subscriber's channel (at least one, at most
165    /// [`TEST_BATCH_LIMIT`] messages). Blocks until the first message arrives.
166    fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
167        let requeue = self.requeue.clone();
168        futures::stream::poll_fn(move |cx| {
169            let first = match self.rx.poll_recv(cx) {
170                Poll::Pending => return Poll::Pending,
171                Poll::Ready(None) => return Poll::Ready(None),
172                Poll::Ready(Some(d)) => RedisTestMessage::from_delivery(d, requeue.clone()),
173            };
174            let mut batch = vec![first];
175            while batch.len() < TEST_BATCH_LIMIT {
176                match self.rx.poll_recv(cx) {
177                    Poll::Ready(Some(d)) => {
178                        batch.push(RedisTestMessage::from_delivery(d, requeue.clone()));
179                    }
180                    Poll::Ready(None) | Poll::Pending => break,
181                }
182            }
183            Poll::Ready(Some(Ok(batch)))
184        })
185    }
186}