ruststream_fred/testing/
subscriber.rs1use std::sync::{Arc, OnceLock};
8use std::task::Poll;
9
10use futures::Stream;
11use ruststream::{AckError, BatchSubscriber, Headers, IncomingMessage, Partitioned, Subscriber};
12
13use crate::{
14 error::RedisError,
15 testing::{
16 broker::TestBrokerState,
17 router::{Delivery, DeliveryReceiver, DeliverySender, SubscriptionId},
18 },
19};
20
21pub struct RedisTestSubscriber {
23 state: Arc<TestBrokerState>,
24 id: SubscriptionId,
25 rx: DeliveryReceiver,
26 requeue: DeliverySender,
27}
28
29impl std::fmt::Debug for RedisTestSubscriber {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("RedisTestSubscriber")
32 .finish_non_exhaustive()
33 }
34}
35
36impl RedisTestSubscriber {
37 pub(crate) fn new(
38 state: Arc<TestBrokerState>,
39 id: SubscriptionId,
40 rx: DeliveryReceiver,
41 requeue: DeliverySender,
42 ) -> Self {
43 Self {
44 state,
45 id,
46 rx,
47 requeue,
48 }
49 }
50}
51
52impl Drop for RedisTestSubscriber {
53 fn drop(&mut self) {
54 self.state.router.unsubscribe(self.id);
55 }
56}
57
58impl Subscriber for RedisTestSubscriber {
59 type Message = RedisTestMessage;
60 type Error = RedisError;
61
62 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
63 let requeue = self.requeue.clone();
64 futures::stream::poll_fn(move |cx| {
68 self.rx.poll_recv(cx).map(|next| {
69 next.map(|delivery| {
70 Ok(RedisTestMessage {
71 delivery: Some(delivery),
72 requeue: requeue.clone(),
73 })
74 })
75 })
76 })
77 }
78}
79
80pub struct RedisTestMessage {
86 delivery: Option<Delivery>,
87 requeue: DeliverySender,
88}
89
90impl std::fmt::Debug for RedisTestMessage {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 f.debug_struct("RedisTestMessage")
93 .field(
94 "subject",
95 &self.delivery.as_ref().map(|d| d.subject.as_str()),
96 )
97 .finish_non_exhaustive()
98 }
99}
100
101impl RedisTestMessage {
102 pub(crate) fn from_delivery(delivery: Delivery, requeue: DeliverySender) -> Self {
103 Self {
104 delivery: Some(delivery),
105 requeue,
106 }
107 }
108
109 #[must_use]
111 pub fn subject(&self) -> &str {
112 self.delivery
113 .as_ref()
114 .map(|d| d.subject.as_str())
115 .unwrap_or_default()
116 }
117}
118
119impl Partitioned for RedisTestMessage {
120 fn partition_key(&self) -> Option<&[u8]> {
121 self.headers().get(crate::PARTITION_KEY_HEADER)
122 }
123}
124
125impl IncomingMessage for RedisTestMessage {
126 fn payload(&self) -> &[u8] {
127 self.delivery
128 .as_ref()
129 .map(|d| d.payload.as_ref())
130 .unwrap_or_default()
131 }
132
133 fn headers(&self) -> &Headers {
134 static EMPTY: OnceLock<Headers> = OnceLock::new();
135 self.delivery
136 .as_ref()
137 .map_or_else(|| EMPTY.get_or_init(Headers::new), |d| &d.headers)
138 }
139
140 async fn ack(mut self) -> Result<(), AckError> {
141 self.delivery.take();
142 Ok(())
143 }
144
145 async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
146 let delivery = self
147 .delivery
148 .take()
149 .expect("RedisTestMessage ack/nack invoked twice");
150 if requeue {
151 let _ = self.requeue.send(delivery);
152 }
153 Ok(())
154 }
155}
156
157const TEST_BATCH_LIMIT: usize = 256;
160
161impl BatchSubscriber for RedisTestSubscriber {
162 type Batch = Vec<RedisTestMessage>;
163
164 fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
167 let requeue = self.requeue.clone();
168 futures::stream::poll_fn(move |cx| {
169 let first = match self.rx.poll_recv(cx) {
170 Poll::Pending => return Poll::Pending,
171 Poll::Ready(None) => return Poll::Ready(None),
172 Poll::Ready(Some(d)) => RedisTestMessage::from_delivery(d, requeue.clone()),
173 };
174 let mut batch = vec![first];
175 while batch.len() < TEST_BATCH_LIMIT {
176 match self.rx.poll_recv(cx) {
177 Poll::Ready(Some(d)) => {
178 batch.push(RedisTestMessage::from_delivery(d, requeue.clone()));
179 }
180 Poll::Ready(None) | Poll::Pending => break,
181 }
182 }
183 Poll::Ready(Some(Ok(batch)))
184 })
185 }
186}