ruststream_fred/testing/
subscriber.rs1use std::sync::{Arc, OnceLock};
8use std::task::Poll;
9
10use futures::Stream;
11use ruststream::{
12 AckError, BatchSubscriber, Headers, IncomingMessage, Partitioned, Subscriber,
13 testing::Coordinator,
14};
15
16use crate::{
17 error::RedisError,
18 testing::{
19 broker::TestBrokerState,
20 router::{Delivery, DeliveryReceiver, DeliverySender, SubscriptionId},
21 },
22};
23
24pub struct RedisTestSubscriber {
26 state: Arc<TestBrokerState>,
27 id: SubscriptionId,
28 rx: DeliveryReceiver,
29 requeue: DeliverySender,
30 coordinator: Option<Coordinator>,
33}
34
35impl std::fmt::Debug for RedisTestSubscriber {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("RedisTestSubscriber")
38 .finish_non_exhaustive()
39 }
40}
41
42impl RedisTestSubscriber {
43 pub(crate) fn new(
44 state: Arc<TestBrokerState>,
45 id: SubscriptionId,
46 rx: DeliveryReceiver,
47 requeue: DeliverySender,
48 ) -> Self {
49 let coordinator = state.coordinator();
52 Self {
53 state,
54 id,
55 rx,
56 requeue,
57 coordinator,
58 }
59 }
60}
61
62impl Drop for RedisTestSubscriber {
63 fn drop(&mut self) {
64 self.state.router.unsubscribe(self.id);
65 }
66}
67
68impl Subscriber for RedisTestSubscriber {
69 type Message = RedisTestMessage;
70 type Error = RedisError;
71
72 fn stream(&mut self) -> impl Stream<Item = Result<Self::Message, Self::Error>> + Send + '_ {
73 let requeue = self.requeue.clone();
74 let coordinator = self.coordinator.clone();
75 futures::stream::poll_fn(move |cx| {
79 self.rx.poll_recv(cx).map(|next| {
80 next.map(|delivery| {
81 Ok(RedisTestMessage::from_delivery(
82 delivery,
83 requeue.clone(),
84 coordinator.clone(),
85 ))
86 })
87 })
88 })
89 }
90}
91
92pub struct RedisTestMessage {
98 delivery: Option<Delivery>,
99 requeue: DeliverySender,
100 coordinator: Option<Coordinator>,
104}
105
106impl Drop for RedisTestMessage {
107 fn drop(&mut self) {
111 if let Some(coordinator) = &self.coordinator {
112 coordinator.consumed();
113 }
114 }
115}
116
117impl std::fmt::Debug for RedisTestMessage {
118 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
119 f.debug_struct("RedisTestMessage")
120 .field(
121 "subject",
122 &self.delivery.as_ref().map(|d| d.subject.as_str()),
123 )
124 .finish_non_exhaustive()
125 }
126}
127
128impl RedisTestMessage {
129 pub(crate) fn from_delivery(
130 delivery: Delivery,
131 requeue: DeliverySender,
132 coordinator: Option<Coordinator>,
133 ) -> Self {
134 Self {
135 delivery: Some(delivery),
136 requeue,
137 coordinator,
138 }
139 }
140
141 #[must_use]
143 pub fn subject(&self) -> &str {
144 self.delivery
145 .as_ref()
146 .map(|d| d.subject.as_str())
147 .unwrap_or_default()
148 }
149}
150
151impl Partitioned for RedisTestMessage {
152 fn partition_key(&self) -> Option<&[u8]> {
153 self.headers().get(crate::PARTITION_KEY_HEADER)
154 }
155}
156
157impl IncomingMessage for RedisTestMessage {
158 fn payload(&self) -> &[u8] {
159 self.delivery
160 .as_ref()
161 .map(|d| d.payload.as_ref())
162 .unwrap_or_default()
163 }
164
165 fn headers(&self) -> &Headers {
166 static EMPTY: OnceLock<Headers> = OnceLock::new();
167 self.delivery
168 .as_ref()
169 .map_or_else(|| EMPTY.get_or_init(Headers::new), |d| &d.headers)
170 }
171
172 async fn ack(mut self) -> Result<(), AckError> {
173 self.delivery.take();
174 Ok(())
175 }
176
177 async fn nack(mut self, requeue: bool) -> Result<(), AckError> {
178 let delivery = self
179 .delivery
180 .take()
181 .expect("RedisTestMessage ack/nack invoked twice");
182 if requeue {
183 if self.requeue.send(delivery).is_ok()
187 && let Some(coordinator) = &self.coordinator
188 {
189 coordinator.enqueued();
190 }
191 }
192 Ok(())
193 }
194}
195
196const TEST_BATCH_LIMIT: usize = 256;
199
200impl BatchSubscriber for RedisTestSubscriber {
201 type Batch = Vec<RedisTestMessage>;
202
203 fn batches(&mut self) -> impl Stream<Item = Result<Self::Batch, Self::Error>> + Send + '_ {
206 let requeue = self.requeue.clone();
207 let coordinator = self.coordinator.clone();
208 futures::stream::poll_fn(move |cx| {
209 let first = match self.rx.poll_recv(cx) {
210 Poll::Pending => return Poll::Pending,
211 Poll::Ready(None) => return Poll::Ready(None),
212 Poll::Ready(Some(d)) => {
213 RedisTestMessage::from_delivery(d, requeue.clone(), coordinator.clone())
214 }
215 };
216 let mut batch = vec![first];
217 while batch.len() < TEST_BATCH_LIMIT {
218 match self.rx.poll_recv(cx) {
219 Poll::Ready(Some(d)) => {
220 batch.push(RedisTestMessage::from_delivery(
221 d,
222 requeue.clone(),
223 coordinator.clone(),
224 ));
225 }
226 Poll::Ready(None) | Poll::Pending => break,
227 }
228 }
229 Poll::Ready(Some(Ok(batch)))
230 })
231 }
232}