Skip to main content

queuerious_lapin/
consumer.rs

1//! Tracked consumer wrapping lapin::Consumer.
2
3use std::sync::Arc;
4
5use futures_util::StreamExt;
6use queuerious::{Backend, JobEvent, QueuriousClient};
7
8use crate::config::LapinAdapterConfig;
9use crate::delivery::TrackedDelivery;
10
11#[cfg(feature = "metrics")]
12use std::collections::HashMap;
13
14#[cfg(feature = "metrics")]
15use crate::metrics::SlidingWindowCounters;
16
17/// A wrapped `lapin::Consumer` that automatically reports job lifecycle events.
18///
19/// Each delivery is wrapped in a [`TrackedDelivery`] that reports `job_started`
20/// when received (via a direct API call to get the server-assigned `execution_id`),
21/// and `job_completed`/`job_failed` when acked/nacked.
22///
23/// Use [`next_tracked()`](Self::next_tracked) to consume deliveries with full
24/// lifecycle tracking.
25///
26/// # Example
27///
28/// ```no_run
29/// use queuerious::QueuriousClient;
30/// use queuerious_lapin::TrackedConsumer;
31///
32/// # async fn example(channel: &lapin::Channel) -> Result<(), Box<dyn std::error::Error>> {
33/// let client = std::sync::Arc::new(
34///     QueuriousClient::builder()
35///         .api_key("qk_your_key")
36///         .build()?
37/// );
38///
39/// let consumer = channel.basic_consume(
40///     "my-queue", "my-consumer",
41///     lapin::options::BasicConsumeOptions::default(),
42///     lapin::types::FieldTable::default(),
43/// ).await?;
44///
45/// let mut tracked = TrackedConsumer::new(consumer, "my-queue", client);
46///
47/// while let Some(delivery) = tracked.next_tracked().await {
48///     let delivery = delivery?;
49///     // Process message...
50///     delivery.ack(lapin::options::BasicAckOptions::default()).await?;
51/// }
52/// # Ok(())
53/// # }
54/// ```
55pub struct TrackedConsumer {
56    inner: lapin::Consumer,
57    client: Arc<QueuriousClient>,
58    queue_name: String,
59    config: LapinAdapterConfig,
60    #[cfg(feature = "metrics")]
61    counters: Option<Arc<HashMap<String, SlidingWindowCounters>>>,
62}
63
64impl TrackedConsumer {
65    /// Wrap a lapin consumer for automatic event tracking.
66    pub fn new(
67        consumer: lapin::Consumer,
68        queue_name: impl Into<String>,
69        client: Arc<QueuriousClient>,
70    ) -> Self {
71        Self::with_config(consumer, queue_name, client, LapinAdapterConfig::default())
72    }
73
74    /// Wrap a lapin consumer with custom configuration.
75    pub fn with_config(
76        consumer: lapin::Consumer,
77        queue_name: impl Into<String>,
78        client: Arc<QueuriousClient>,
79        config: LapinAdapterConfig,
80    ) -> Self {
81        Self {
82            inner: consumer,
83            client,
84            queue_name: queue_name.into(),
85            config,
86            #[cfg(feature = "metrics")]
87            counters: None,
88        }
89    }
90
91    /// Attach shared metrics counters for Tier 0 data collection.
92    ///
93    /// When set, each [`TrackedDelivery`] produced by this consumer will
94    /// record processing duration and success/failure to the counters map
95    /// under this consumer's queue name.
96    ///
97    /// The same counters map should be shared with
98    /// [`RabbitMqMetricsCollector`](crate::metrics::RabbitMqMetricsCollector).
99    #[cfg(feature = "metrics")]
100    pub fn with_counters(mut self, counters: Arc<HashMap<String, SlidingWindowCounters>>) -> Self {
101        self.counters = Some(counters);
102        self
103    }
104
105    /// Get a reference to the shared counters, if set.
106    #[cfg(feature = "metrics")]
107    pub fn counters(&self) -> Option<&Arc<HashMap<String, SlidingWindowCounters>>> {
108        self.counters.as_ref()
109    }
110
111    /// Process the next delivery with full lifecycle tracking.
112    ///
113    /// Reports `job_started` via a direct API call and obtains the server-assigned
114    /// `execution_id`, enabling accurate `job_completed`/`job_failed` tracking
115    /// when the returned [`TrackedDelivery`] is acked or nacked.
116    ///
117    /// Returns `None` when the consumer stream ends.
118    pub async fn next_tracked(&mut self) -> Option<Result<TrackedDelivery, TrackedConsumerError>> {
119        let delivery = match self.inner.next().await {
120            Some(Ok(d)) => d,
121            Some(Err(e)) => return Some(Err(TrackedConsumerError::Lapin(e))),
122            None => return None,
123        };
124
125        // Parse the JSON payload once — shared across job_id, correlation_id,
126        // and event payload extraction to avoid redundant deserialization.
127        let parsed_payload: Option<serde_json::Value> = serde_json::from_slice(&delivery.data).ok();
128
129        let job_id = self
130            .config
131            .job_id_extractor
132            .as_ref()
133            .map(|f| f(&delivery))
134            .unwrap_or_else(|| crate::config::extract_job_id(&delivery, parsed_payload.as_ref()));
135
136        let job_type = self
137            .config
138            .job_type_extractor
139            .as_ref()
140            .map(|f| f(&delivery))
141            .unwrap_or_else(|| {
142                crate::config::extract_job_type(
143                    &delivery.properties,
144                    &delivery.routing_key,
145                    &self.config.default_job_type,
146                )
147            });
148
149        let payload = if self.config.include_payload {
150            if delivery.data.len() > self.config.max_payload_size {
151                serde_json::json!({
152                    "_queuerious": "payload_too_large",
153                    "message": format!(
154                        "Payload not stored ({} bytes exceeds {} byte limit)",
155                        delivery.data.len(),
156                        self.config.max_payload_size
157                    ),
158                    "original_size_bytes": delivery.data.len()
159                })
160            } else {
161                parsed_payload.clone().unwrap_or(serde_json::Value::Null)
162            }
163        } else {
164            serde_json::Value::Object(serde_json::Map::new())
165        };
166
167        // Extract metadata from AMQP properties
168        let metadata = self
169            .config
170            .metadata_extractor
171            .as_ref()
172            .map(|f| f(&delivery))
173            .or_else(|| {
174                if self.config.include_default_metadata {
175                    let meta = crate::config::extract_default_metadata(&delivery);
176                    // Only include if there's actual metadata
177                    if meta.as_object().is_none_or(|m| m.is_empty()) {
178                        None
179                    } else {
180                        Some(meta)
181                    }
182                } else {
183                    None
184                }
185            });
186
187        let mut builder =
188            JobEvent::started(&self.queue_name, Backend::RabbitMQ, &job_id, &job_type)
189                .payload(payload);
190
191        if let Some(meta) = metadata {
192            builder = builder.metadata(meta);
193        }
194
195        // Extract correlation_id: custom extractor -> default chain
196        let correlation_id = self
197            .config
198            .correlation_id_extractor
199            .as_ref()
200            .and_then(|f| f(&delivery))
201            .or_else(|| crate::config::extract_correlation_id(&delivery, parsed_payload.as_ref()));
202
203        if let Some(cid) = correlation_id {
204            builder = builder.correlation_id(cid);
205        }
206
207        if let Some(max) = self.config.max_attempts {
208            builder = builder.max_attempts(max);
209        }
210
211        // Propagate retry count from AMQP headers as attempt_number so the
212        // server records the correct attempt instead of always starting at 1.
213        if let Some(retry_count) = crate::config::extract_retry_count(&delivery) {
214            builder = builder.attempt_number(retry_count + 1);
215        }
216
217        let event = builder.build();
218
219        // Build the TrackedDelivery with optional metrics support.
220        let make_tracked = |delivery, execution_id, created_at| {
221            let tracked = TrackedDelivery::new(
222                delivery,
223                self.client.clone(),
224                self.queue_name.clone(),
225                execution_id,
226                created_at,
227            );
228
229            #[cfg(feature = "metrics")]
230            let tracked = if let Some(ref counters) = self.counters {
231                tracked.with_counters(counters.clone())
232            } else {
233                tracked
234            };
235
236            tracked
237        };
238
239        // Send directly and get execution_id from the response.
240        match self.client.report_single(event).await {
241            Ok(response) => {
242                let (execution_id, created_at) = if let Some(result) = response.results.first() {
243                    (result.execution_id, result.created_at)
244                } else {
245                    tracing::warn!(
246                        "Ingest response missing results; lifecycle tracking unavailable"
247                    );
248                    (uuid::Uuid::nil(), chrono::Utc::now())
249                };
250
251                Some(Ok(make_tracked(delivery, execution_id, created_at)))
252            }
253            Err(e) => {
254                tracing::warn!(error = %e, "Failed to report job_started; proceeding without tracking");
255                Some(Ok(make_tracked(
256                    delivery,
257                    uuid::Uuid::nil(),
258                    chrono::Utc::now(),
259                )))
260            }
261        }
262    }
263}
264
265/// Errors from the tracked consumer.
266#[derive(Debug, thiserror::Error)]
267pub enum TrackedConsumerError {
268    /// Lapin error.
269    #[error("Lapin error: {0}")]
270    Lapin(#[from] lapin::Error),
271}