queuerious_lapin/
consumer.rs1use std::sync::Arc;
4
5use futures_util::StreamExt;
6use queuerious::{Backend, JobEvent, QueuriousClient};
7
8use crate::config::LapinAdapterConfig;
9use crate::delivery::TrackedDelivery;
10
11#[cfg(feature = "metrics")]
12use std::collections::HashMap;
13
14#[cfg(feature = "metrics")]
15use crate::metrics::SlidingWindowCounters;
16
17pub struct TrackedConsumer {
56 inner: lapin::Consumer,
57 client: Arc<QueuriousClient>,
58 queue_name: String,
59 config: LapinAdapterConfig,
60 #[cfg(feature = "metrics")]
61 counters: Option<Arc<HashMap<String, SlidingWindowCounters>>>,
62}
63
64impl TrackedConsumer {
65 pub fn new(
67 consumer: lapin::Consumer,
68 queue_name: impl Into<String>,
69 client: Arc<QueuriousClient>,
70 ) -> Self {
71 Self::with_config(consumer, queue_name, client, LapinAdapterConfig::default())
72 }
73
74 pub fn with_config(
76 consumer: lapin::Consumer,
77 queue_name: impl Into<String>,
78 client: Arc<QueuriousClient>,
79 config: LapinAdapterConfig,
80 ) -> Self {
81 Self {
82 inner: consumer,
83 client,
84 queue_name: queue_name.into(),
85 config,
86 #[cfg(feature = "metrics")]
87 counters: None,
88 }
89 }
90
91 #[cfg(feature = "metrics")]
100 pub fn with_counters(mut self, counters: Arc<HashMap<String, SlidingWindowCounters>>) -> Self {
101 self.counters = Some(counters);
102 self
103 }
104
105 #[cfg(feature = "metrics")]
107 pub fn counters(&self) -> Option<&Arc<HashMap<String, SlidingWindowCounters>>> {
108 self.counters.as_ref()
109 }
110
111 pub async fn next_tracked(&mut self) -> Option<Result<TrackedDelivery, TrackedConsumerError>> {
119 let delivery = match self.inner.next().await {
120 Some(Ok(d)) => d,
121 Some(Err(e)) => return Some(Err(TrackedConsumerError::Lapin(e))),
122 None => return None,
123 };
124
125 let parsed_payload: Option<serde_json::Value> = serde_json::from_slice(&delivery.data).ok();
128
129 let job_id = self
130 .config
131 .job_id_extractor
132 .as_ref()
133 .map(|f| f(&delivery))
134 .unwrap_or_else(|| crate::config::extract_job_id(&delivery, parsed_payload.as_ref()));
135
136 let job_type = self
137 .config
138 .job_type_extractor
139 .as_ref()
140 .map(|f| f(&delivery))
141 .unwrap_or_else(|| {
142 crate::config::extract_job_type(
143 &delivery.properties,
144 &delivery.routing_key,
145 &self.config.default_job_type,
146 )
147 });
148
149 let payload = if self.config.include_payload {
150 if delivery.data.len() > self.config.max_payload_size {
151 serde_json::json!({
152 "_queuerious": "payload_too_large",
153 "message": format!(
154 "Payload not stored ({} bytes exceeds {} byte limit)",
155 delivery.data.len(),
156 self.config.max_payload_size
157 ),
158 "original_size_bytes": delivery.data.len()
159 })
160 } else {
161 parsed_payload.clone().unwrap_or(serde_json::Value::Null)
162 }
163 } else {
164 serde_json::Value::Object(serde_json::Map::new())
165 };
166
167 let metadata = self
169 .config
170 .metadata_extractor
171 .as_ref()
172 .map(|f| f(&delivery))
173 .or_else(|| {
174 if self.config.include_default_metadata {
175 let meta = crate::config::extract_default_metadata(&delivery);
176 if meta.as_object().is_none_or(|m| m.is_empty()) {
178 None
179 } else {
180 Some(meta)
181 }
182 } else {
183 None
184 }
185 });
186
187 let mut builder =
188 JobEvent::started(&self.queue_name, Backend::RabbitMQ, &job_id, &job_type)
189 .payload(payload);
190
191 if let Some(meta) = metadata {
192 builder = builder.metadata(meta);
193 }
194
195 let correlation_id = self
197 .config
198 .correlation_id_extractor
199 .as_ref()
200 .and_then(|f| f(&delivery))
201 .or_else(|| crate::config::extract_correlation_id(&delivery, parsed_payload.as_ref()));
202
203 if let Some(cid) = correlation_id {
204 builder = builder.correlation_id(cid);
205 }
206
207 if let Some(max) = self.config.max_attempts {
208 builder = builder.max_attempts(max);
209 }
210
211 if let Some(retry_count) = crate::config::extract_retry_count(&delivery) {
214 builder = builder.attempt_number(retry_count + 1);
215 }
216
217 let event = builder.build();
218
219 let make_tracked = |delivery, execution_id, created_at| {
221 let tracked = TrackedDelivery::new(
222 delivery,
223 self.client.clone(),
224 self.queue_name.clone(),
225 execution_id,
226 created_at,
227 );
228
229 #[cfg(feature = "metrics")]
230 let tracked = if let Some(ref counters) = self.counters {
231 tracked.with_counters(counters.clone())
232 } else {
233 tracked
234 };
235
236 tracked
237 };
238
239 match self.client.report_single(event).await {
241 Ok(response) => {
242 let (execution_id, created_at) = if let Some(result) = response.results.first() {
243 (result.execution_id, result.created_at)
244 } else {
245 tracing::warn!(
246 "Ingest response missing results; lifecycle tracking unavailable"
247 );
248 (uuid::Uuid::nil(), chrono::Utc::now())
249 };
250
251 Some(Ok(make_tracked(delivery, execution_id, created_at)))
252 }
253 Err(e) => {
254 tracing::warn!(error = %e, "Failed to report job_started; proceeding without tracking");
255 Some(Ok(make_tracked(
256 delivery,
257 uuid::Uuid::nil(),
258 chrono::Utc::now(),
259 )))
260 }
261 }
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum TrackedConsumerError {
268 #[error("Lapin error: {0}")]
270 Lapin(#[from] lapin::Error),
271}