Skip to main content

queuerious_lapin/
config.rs

1//! Lapin adapter configuration.
2
3use std::sync::Arc;
4
5use lapin::types::AMQPValue;
6
7/// Configuration for the Queuerious lapin adapter.
8#[derive(Clone)]
9#[allow(clippy::type_complexity)]
10pub struct LapinAdapterConfig {
11    /// Default job type when not extractable from the message (default: "unknown").
12    pub default_job_type: String,
13    /// Whether to include the message payload in events (default: false).
14    /// May contain PII — enable with caution.
15    pub include_payload: bool,
16    /// Maximum payload size in bytes to include (default: 65536 / 64 KB).
17    /// Payloads exceeding this limit are replaced with a descriptive placeholder.
18    pub max_payload_size: usize,
19    /// Custom function to extract the job type from a delivery.
20    pub job_type_extractor: Option<Arc<dyn Fn(&lapin::message::Delivery) -> String + Send + Sync>>,
21    /// Custom function to extract the job ID from a delivery.
22    /// Defaults to using the delivery tag as a string.
23    pub job_id_extractor: Option<Arc<dyn Fn(&lapin::message::Delivery) -> String + Send + Sync>>,
24    /// Custom function to extract metadata from a delivery.
25    /// When set, this is used instead of the default metadata extraction.
26    pub metadata_extractor:
27        Option<Arc<dyn Fn(&lapin::message::Delivery) -> serde_json::Value + Send + Sync>>,
28    /// Whether to include default AMQP metadata when no custom metadata_extractor is set (default: true).
29    pub include_default_metadata: bool,
30    /// Custom function to extract the correlation ID from a delivery.
31    /// When not set, the default extraction tries: AMQP `correlation_id` property,
32    /// `x-correlation-id` header, then JSON payload fields (`correlation_id`, `correlationId`).
33    pub correlation_id_extractor:
34        Option<Arc<dyn Fn(&lapin::message::Delivery) -> Option<String> + Send + Sync>>,
35    /// Maximum retry attempts for this consumer. When set, included in `job_started`
36    /// events so the server can track attempt progress.
37    pub max_attempts: Option<i32>,
38}
39
40impl Default for LapinAdapterConfig {
41    fn default() -> Self {
42        Self {
43            default_job_type: "unknown".to_string(),
44            include_payload: false,
45            max_payload_size: 65_536,
46            job_type_extractor: None,
47            job_id_extractor: None,
48            metadata_extractor: None,
49            include_default_metadata: true,
50            correlation_id_extractor: None,
51            max_attempts: None,
52        }
53    }
54}
55
56impl std::fmt::Debug for LapinAdapterConfig {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("LapinAdapterConfig")
59            .field("default_job_type", &self.default_job_type)
60            .field("include_payload", &self.include_payload)
61            .field("max_payload_size", &self.max_payload_size)
62            .field(
63                "job_type_extractor",
64                &self.job_type_extractor.as_ref().map(|_| "..."),
65            )
66            .field(
67                "job_id_extractor",
68                &self.job_id_extractor.as_ref().map(|_| "..."),
69            )
70            .field(
71                "metadata_extractor",
72                &self.metadata_extractor.as_ref().map(|_| "..."),
73            )
74            .field("include_default_metadata", &self.include_default_metadata)
75            .field(
76                "correlation_id_extractor",
77                &self.correlation_id_extractor.as_ref().map(|_| "..."),
78            )
79            .field("max_attempts", &self.max_attempts)
80            .finish()
81    }
82}
83
84/// Extract the `x-retry-count` header from a delivery's AMQP properties.
85///
86/// Returns the retry count as an `i32` if the header is present and holds an
87/// integer value, or `None` for first-attempt deliveries (no header).
88///
89/// Used by [`TrackedConsumer`] to propagate the current attempt number
90/// (`retry_count + 1`) in `job_started` events.
91pub(crate) fn extract_retry_count(delivery: &lapin::message::Delivery) -> Option<i32> {
92    delivery
93        .properties
94        .headers()
95        .as_ref()
96        .and_then(|h| h.inner().get("x-retry-count"))
97        .and_then(|v| match v {
98            AMQPValue::LongLongInt(n) => i32::try_from(*n).ok(),
99            AMQPValue::ShortShortInt(n) => Some(i32::from(*n)),
100            AMQPValue::ShortInt(n) => Some(i32::from(*n)),
101            AMQPValue::LongInt(n) => Some(*n),
102            _ => None,
103        })
104}
105
106/// Extract a string value from an AMQP header value (`LongString` or `ShortString`).
107fn amqp_value_as_string(value: &AMQPValue) -> Option<String> {
108    match value {
109        AMQPValue::LongString(s) => {
110            let s = String::from_utf8_lossy(s.as_bytes()).into_owned();
111            if s.is_empty() {
112                None
113            } else {
114                Some(s)
115            }
116        }
117        AMQPValue::ShortString(s) => {
118            let s = s.as_str();
119            if s.is_empty() {
120                None
121            } else {
122                Some(s.to_owned())
123            }
124        }
125        _ => None,
126    }
127}
128
129/// Look up a header by key and extract its string value.
130fn header_string(properties: &lapin::BasicProperties, key: &str) -> Option<String> {
131    properties
132        .headers()
133        .as_ref()
134        .and_then(|table| table.inner().get(key))
135        .and_then(amqp_value_as_string)
136}
137
138/// Smart default job type extraction from AMQP message metadata.
139///
140/// Tries multiple sources in priority order:
141/// 1. AMQP `type` property (`properties.kind()`)
142/// 2. `x-job-type` header (Queuerious convention)
143/// 3. `job_type` header (common convention)
144/// 4. Routing key (often encodes type info, e.g. `"emails.send"`)
145/// 5. Falls back to `default`
146pub(crate) fn extract_job_type(
147    properties: &lapin::BasicProperties,
148    routing_key: &lapin::types::ShortString,
149    default: &str,
150) -> String {
151    // 1. AMQP type property (most standard)
152    if let Some(kind) = properties.kind().as_ref() {
153        let s = kind.as_str();
154        if !s.is_empty() {
155            return s.to_owned();
156        }
157    }
158
159    // 2. x-job-type header (Queuerious convention)
160    if let Some(jt) = header_string(properties, "x-job-type") {
161        return jt;
162    }
163
164    // 3. job_type header (common convention)
165    if let Some(jt) = header_string(properties, "job_type") {
166        return jt;
167    }
168
169    // 4. Routing key
170    let rk = routing_key.as_str();
171    if !rk.is_empty() {
172        return rk.to_owned();
173    }
174
175    // 5. Final fallback
176    default.to_owned()
177}
178
179/// Smart default job ID extraction from AMQP message metadata and payload.
180///
181/// Tries multiple sources in priority order:
182/// 1. AMQP `message_id` property (standard AMQP for message identity)
183/// 2. `x-job-id` header (Queuerious convention)
184/// 3. JSON payload fields: `job_id`, `jobId` (top-level, then one level deep
185///    for externally-tagged enums like `{"CreateEvent": {"job_id": "..."}}`)
186/// 4. Falls back to `delivery_tag` as a string
187///
188/// Pass a pre-parsed `serde_json::Value` to avoid redundant JSON parsing when
189/// multiple extractors need to inspect the same payload.
190pub(crate) fn extract_job_id(
191    delivery: &lapin::message::Delivery,
192    parsed_payload: Option<&serde_json::Value>,
193) -> String {
194    // 1. AMQP message_id property (most standard)
195    if let Some(msg_id) = delivery.properties.message_id().as_ref() {
196        let s = msg_id.as_str();
197        if !s.is_empty() {
198            return s.to_owned();
199        }
200    }
201
202    // 2. x-job-id header (Queuerious convention)
203    if let Some(job_id) = header_string(&delivery.properties, "x-job-id") {
204        return job_id;
205    }
206
207    // 3. JSON payload (common field names)
208    if let Some(id) = extract_string_field_from_payload(parsed_payload, &["job_id", "jobId"]) {
209        return id;
210    }
211
212    // 4. Delivery tag fallback
213    delivery.delivery_tag.to_string()
214}
215
216/// Smart default correlation ID extraction from AMQP message metadata and payload.
217///
218/// Tries multiple sources in priority order:
219/// 1. AMQP `correlation_id` property (standard AMQP)
220/// 2. `x-correlation-id` header
221/// 3. JSON payload fields: `correlation_id`, `correlationId` (top-level, then one
222///    level deep for externally-tagged enums)
223///
224/// Pass a pre-parsed `serde_json::Value` to avoid redundant JSON parsing when
225/// multiple extractors need to inspect the same payload.
226pub(crate) fn extract_correlation_id(
227    delivery: &lapin::message::Delivery,
228    parsed_payload: Option<&serde_json::Value>,
229) -> Option<String> {
230    // 1. AMQP correlation_id property (standard AMQP)
231    if let Some(cid) = delivery.properties.correlation_id().as_ref() {
232        let s = cid.as_str();
233        if !s.is_empty() {
234            return Some(s.to_owned());
235        }
236    }
237
238    // 2. x-correlation-id header
239    if let Some(cid) = header_string(&delivery.properties, "x-correlation-id") {
240        return Some(cid);
241    }
242
243    // 3. JSON payload (common field names)
244    extract_string_field_from_payload(parsed_payload, &["correlation_id", "correlationId"])
245}
246
247/// Try to extract a string field from a pre-parsed JSON value by checking multiple field names.
248///
249/// Handles both flat payloads (`{"job_id": "..."}`) and externally-tagged enums
250/// (`{"VariantName": {"job_id": "..."}}`).
251///
252/// Returns `None` if `parsed` is `None` (e.g. non-JSON payload or parse failure).
253fn extract_string_field_from_payload(
254    parsed: Option<&serde_json::Value>,
255    field_names: &[&str],
256) -> Option<String> {
257    let value = parsed?;
258
259    // Try top-level fields first
260    if let Some(id) = try_extract_string_field(value, field_names) {
261        return Some(id);
262    }
263
264    // For externally-tagged enums: {"VariantName": {"field": ...}}
265    // Check if root is an object with a single key containing an inner object
266    if let Some(obj) = value.as_object() {
267        if obj.len() == 1 {
268            if let Some(inner) = obj.values().next() {
269                return try_extract_string_field(inner, field_names);
270            }
271        }
272    }
273
274    None
275}
276
277/// Try to extract one of the given field names as a non-empty string from a JSON value.
278///
279/// Handles both string and numeric values (many systems use integer job IDs).
280fn try_extract_string_field(value: &serde_json::Value, field_names: &[&str]) -> Option<String> {
281    for key in field_names {
282        match value.get(*key) {
283            Some(serde_json::Value::String(s)) if !s.is_empty() => return Some(s.clone()),
284            Some(serde_json::Value::Number(n)) => return Some(n.to_string()),
285            _ => continue,
286        }
287    }
288    None
289}
290
291/// Extracts default metadata from AMQP message properties and delivery info.
292///
293/// Includes standard AMQP properties (message_id, app_id, content_type, etc.),
294/// delivery metadata (routing_key, exchange), and user-defined headers
295/// (excluding internal RabbitMQ headers like `x-death`).
296pub fn extract_default_metadata(delivery: &lapin::message::Delivery) -> serde_json::Value {
297    extract_metadata_from_parts(
298        &delivery.properties,
299        delivery.routing_key.as_str(),
300        delivery.exchange.as_str(),
301    )
302}
303
304/// Core metadata extraction from AMQP properties and delivery routing info.
305///
306/// This is the inner implementation used by [`extract_default_metadata`]. It is
307/// separated so that the property-level logic can be unit-tested without
308/// constructing a full `lapin::message::Delivery` (which requires internal
309/// channel state).
310fn extract_metadata_from_parts(
311    props: &lapin::BasicProperties,
312    routing_key: &str,
313    exchange: &str,
314) -> serde_json::Value {
315    let mut meta = serde_json::Map::new();
316
317    if let Some(msg_id) = props.message_id().as_ref() {
318        let s = msg_id.as_str();
319        if !s.is_empty() {
320            meta.insert("message_id".into(), serde_json::Value::String(s.to_owned()));
321        }
322    }
323    if let Some(app_id) = props.app_id().as_ref() {
324        let s = app_id.as_str();
325        if !s.is_empty() {
326            meta.insert("app_id".into(), serde_json::Value::String(s.to_owned()));
327        }
328    }
329    if let Some(content_type) = props.content_type().as_ref() {
330        let s = content_type.as_str();
331        if !s.is_empty() {
332            meta.insert(
333                "content_type".into(),
334                serde_json::Value::String(s.to_owned()),
335            );
336        }
337    }
338    if let Some(content_encoding) = props.content_encoding().as_ref() {
339        let s = content_encoding.as_str();
340        if !s.is_empty() {
341            meta.insert(
342                "content_encoding".into(),
343                serde_json::Value::String(s.to_owned()),
344            );
345        }
346    }
347    if let Some(priority) = props.priority() {
348        meta.insert("priority".into(), serde_json::json!(*priority));
349    }
350    if let Some(timestamp) = props.timestamp() {
351        meta.insert("timestamp".into(), serde_json::json!(*timestamp));
352    }
353    if let Some(reply_to) = props.reply_to().as_ref() {
354        let s = reply_to.as_str();
355        if !s.is_empty() {
356            meta.insert("reply_to".into(), serde_json::Value::String(s.to_owned()));
357        }
358    }
359    if let Some(expiration) = props.expiration().as_ref() {
360        let s = expiration.as_str();
361        if !s.is_empty() {
362            meta.insert("expiration".into(), serde_json::Value::String(s.to_owned()));
363        }
364    }
365
366    // Delivery metadata
367    if !routing_key.is_empty() {
368        meta.insert(
369            "routing_key".into(),
370            serde_json::Value::String(routing_key.to_owned()),
371        );
372    }
373    if !exchange.is_empty() {
374        meta.insert(
375            "exchange".into(),
376            serde_json::Value::String(exchange.to_owned()),
377        );
378    }
379
380    // User-defined headers (exclude internal RabbitMQ headers)
381    if let Some(headers) = props.headers().as_ref() {
382        let mut user_headers = serde_json::Map::new();
383        for (key, value) in headers.inner() {
384            let key_str = key.as_str();
385            if key_str.starts_with("x-death")
386                || key_str == "x-first-death-exchange"
387                || key_str == "x-first-death-queue"
388                || key_str == "x-first-death-reason"
389            {
390                continue;
391            }
392            if let Some(s) = amqp_value_as_string(value) {
393                user_headers.insert(key_str.to_owned(), serde_json::Value::String(s));
394            }
395        }
396        if !user_headers.is_empty() {
397            meta.insert("headers".into(), serde_json::Value::Object(user_headers));
398        }
399    }
400
401    serde_json::Value::Object(meta)
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use lapin::types::{FieldTable, LongString, ShortString};
408    use lapin::BasicProperties;
409
410    #[test]
411    fn extracts_from_amqp_type_property() {
412        let props = BasicProperties::default().with_type("SendEmail".into());
413        let rk = ShortString::from("");
414        assert_eq!(extract_job_type(&props, &rk, "unknown"), "SendEmail");
415    }
416
417    #[test]
418    fn extracts_from_x_job_type_header() {
419        let mut headers = FieldTable::default();
420        headers.insert(
421            "x-job-type".into(),
422            AMQPValue::LongString(LongString::from("ProcessPayment")),
423        );
424        let props = BasicProperties::default().with_headers(headers);
425        let rk = ShortString::from("some.routing.key");
426        assert_eq!(extract_job_type(&props, &rk, "unknown"), "ProcessPayment");
427    }
428
429    #[test]
430    fn extracts_from_job_type_header() {
431        let mut headers = FieldTable::default();
432        headers.insert(
433            "job_type".into(),
434            AMQPValue::LongString(LongString::from("GenerateReport")),
435        );
436        let props = BasicProperties::default().with_headers(headers);
437        let rk = ShortString::from("");
438        assert_eq!(extract_job_type(&props, &rk, "unknown"), "GenerateReport");
439    }
440
441    #[test]
442    fn extracts_from_routing_key() {
443        let props = BasicProperties::default();
444        let rk = ShortString::from("orders.process");
445        assert_eq!(extract_job_type(&props, &rk, "unknown"), "orders.process");
446    }
447
448    #[test]
449    fn falls_back_to_default() {
450        let props = BasicProperties::default();
451        let rk = ShortString::from("");
452        assert_eq!(extract_job_type(&props, &rk, "fallback"), "fallback");
453    }
454
455    #[test]
456    fn amqp_type_takes_priority_over_headers_and_routing_key() {
457        let mut headers = FieldTable::default();
458        headers.insert(
459            "x-job-type".into(),
460            AMQPValue::LongString(LongString::from("from_header")),
461        );
462        let props = BasicProperties::default()
463            .with_type("from_type".into())
464            .with_headers(headers);
465        let rk = ShortString::from("from_routing_key");
466        assert_eq!(extract_job_type(&props, &rk, "unknown"), "from_type");
467    }
468
469    #[test]
470    fn x_job_type_takes_priority_over_job_type_header() {
471        let mut headers = FieldTable::default();
472        headers.insert(
473            "x-job-type".into(),
474            AMQPValue::LongString(LongString::from("from_x_header")),
475        );
476        headers.insert(
477            "job_type".into(),
478            AMQPValue::LongString(LongString::from("from_generic_header")),
479        );
480        let props = BasicProperties::default().with_headers(headers);
481        let rk = ShortString::from("");
482        assert_eq!(extract_job_type(&props, &rk, "unknown"), "from_x_header");
483    }
484
485    #[test]
486    fn handles_short_string_header_values() {
487        let mut headers = FieldTable::default();
488        headers.insert(
489            "x-job-type".into(),
490            AMQPValue::ShortString("ShortVal".into()),
491        );
492        let props = BasicProperties::default().with_headers(headers);
493        let rk = ShortString::from("");
494        assert_eq!(extract_job_type(&props, &rk, "unknown"), "ShortVal");
495    }
496
497    #[test]
498    fn skips_empty_amqp_type() {
499        let props = BasicProperties::default().with_type("".into());
500        let rk = ShortString::from("fallback_rk");
501        assert_eq!(extract_job_type(&props, &rk, "unknown"), "fallback_rk");
502    }
503
504    #[test]
505    fn skips_empty_header_values() {
506        let mut headers = FieldTable::default();
507        headers.insert(
508            "x-job-type".into(),
509            AMQPValue::LongString(LongString::from("")),
510        );
511        let props = BasicProperties::default().with_headers(headers);
512        let rk = ShortString::from("routing_fallback");
513        assert_eq!(extract_job_type(&props, &rk, "unknown"), "routing_fallback");
514    }
515
516    // --- extract_metadata_from_parts tests ---
517
518    #[test]
519    fn metadata_extracts_standard_amqp_properties() {
520        let props = BasicProperties::default()
521            .with_app_id("my-app".into())
522            .with_message_id("msg-123".into())
523            .with_content_type("application/json".into())
524            .with_content_encoding("utf-8".into())
525            .with_reply_to("reply-queue".into())
526            .with_expiration("60000".into())
527            .with_priority(5)
528            .with_timestamp(1700000000);
529
530        let meta = extract_metadata_from_parts(&props, "", "");
531        let obj = meta.as_object().unwrap();
532
533        assert_eq!(obj.get("app_id").unwrap(), "my-app");
534        assert_eq!(obj.get("message_id").unwrap(), "msg-123");
535        assert_eq!(obj.get("content_type").unwrap(), "application/json");
536        assert_eq!(obj.get("content_encoding").unwrap(), "utf-8");
537        assert_eq!(obj.get("reply_to").unwrap(), "reply-queue");
538        assert_eq!(obj.get("expiration").unwrap(), "60000");
539        assert_eq!(obj.get("priority").unwrap(), 5);
540        assert_eq!(obj.get("timestamp").unwrap(), 1700000000u64);
541    }
542
543    #[test]
544    fn metadata_includes_routing_key_and_exchange() {
545        let props = BasicProperties::default();
546        let meta = extract_metadata_from_parts(&props, "orders.process", "my-exchange");
547        let obj = meta.as_object().unwrap();
548
549        assert_eq!(obj.get("routing_key").unwrap(), "orders.process");
550        assert_eq!(obj.get("exchange").unwrap(), "my-exchange");
551    }
552
553    #[test]
554    fn metadata_empty_for_default_properties_and_empty_routing() {
555        let props = BasicProperties::default();
556        let meta = extract_metadata_from_parts(&props, "", "");
557        let obj = meta.as_object().unwrap();
558
559        assert!(obj.is_empty(), "Expected empty metadata, got: {obj:?}");
560    }
561
562    #[test]
563    fn metadata_skips_empty_string_properties() {
564        let props = BasicProperties::default()
565            .with_app_id("".into())
566            .with_message_id("".into())
567            .with_content_type("".into())
568            .with_reply_to("".into())
569            .with_expiration("".into());
570
571        let meta = extract_metadata_from_parts(&props, "", "");
572        let obj = meta.as_object().unwrap();
573
574        assert!(
575            obj.is_empty(),
576            "Expected empty metadata for empty string props, got: {obj:?}"
577        );
578    }
579
580    #[test]
581    fn metadata_filters_internal_rabbitmq_headers() {
582        let mut headers = FieldTable::default();
583        headers.insert(
584            "x-death".into(),
585            AMQPValue::LongString(LongString::from("death-info")),
586        );
587        headers.insert(
588            "x-death-count".into(),
589            AMQPValue::LongString(LongString::from("3")),
590        );
591        headers.insert(
592            "x-first-death-exchange".into(),
593            AMQPValue::LongString(LongString::from("dlx")),
594        );
595        headers.insert(
596            "x-first-death-queue".into(),
597            AMQPValue::LongString(LongString::from("dlq")),
598        );
599        headers.insert(
600            "x-first-death-reason".into(),
601            AMQPValue::LongString(LongString::from("rejected")),
602        );
603        headers.insert(
604            "custom-header".into(),
605            AMQPValue::LongString(LongString::from("my-value")),
606        );
607        headers.insert(
608            "correlation-id".into(),
609            AMQPValue::LongString(LongString::from("abc-123")),
610        );
611        let props = BasicProperties::default().with_headers(headers);
612
613        let meta = extract_metadata_from_parts(&props, "", "");
614        let obj = meta.as_object().unwrap();
615        let user_headers = obj.get("headers").unwrap().as_object().unwrap();
616
617        // Internal headers must be filtered out
618        assert!(!user_headers.contains_key("x-death"));
619        assert!(!user_headers.contains_key("x-death-count"));
620        assert!(!user_headers.contains_key("x-first-death-exchange"));
621        assert!(!user_headers.contains_key("x-first-death-queue"));
622        assert!(!user_headers.contains_key("x-first-death-reason"));
623
624        // User headers must be preserved
625        assert_eq!(user_headers.get("custom-header").unwrap(), "my-value");
626        assert_eq!(user_headers.get("correlation-id").unwrap(), "abc-123");
627    }
628
629    #[test]
630    fn metadata_includes_short_string_headers() {
631        let mut headers = FieldTable::default();
632        headers.insert("tenant-id".into(), AMQPValue::ShortString("org_123".into()));
633        let props = BasicProperties::default().with_headers(headers);
634
635        let meta = extract_metadata_from_parts(&props, "", "");
636        let obj = meta.as_object().unwrap();
637        let user_headers = obj.get("headers").unwrap().as_object().unwrap();
638
639        assert_eq!(user_headers.get("tenant-id").unwrap(), "org_123");
640    }
641
642    #[test]
643    fn metadata_omits_headers_section_when_all_filtered() {
644        let mut headers = FieldTable::default();
645        headers.insert(
646            "x-death".into(),
647            AMQPValue::LongString(LongString::from("death-info")),
648        );
649        headers.insert(
650            "x-first-death-exchange".into(),
651            AMQPValue::LongString(LongString::from("dlx")),
652        );
653        let props = BasicProperties::default().with_headers(headers);
654
655        let meta = extract_metadata_from_parts(&props, "", "");
656        let obj = meta.as_object().unwrap();
657
658        assert!(
659            !obj.contains_key("headers"),
660            "Headers section should be omitted when all headers are internal"
661        );
662    }
663
664    #[test]
665    fn metadata_skips_non_string_header_values() {
666        let mut headers = FieldTable::default();
667        headers.insert("numeric-header".into(), AMQPValue::LongLongInt(42));
668        headers.insert("bool-header".into(), AMQPValue::Boolean(true));
669        headers.insert(
670            "string-header".into(),
671            AMQPValue::LongString(LongString::from("kept")),
672        );
673        let props = BasicProperties::default().with_headers(headers);
674
675        let meta = extract_metadata_from_parts(&props, "", "");
676        let obj = meta.as_object().unwrap();
677        let user_headers = obj.get("headers").unwrap().as_object().unwrap();
678
679        // Non-string values are skipped by amqp_value_as_string
680        assert!(!user_headers.contains_key("numeric-header"));
681        assert!(!user_headers.contains_key("bool-header"));
682        assert_eq!(user_headers.get("string-header").unwrap(), "kept");
683    }
684
685    #[test]
686    fn metadata_combined_properties_and_delivery_info() {
687        let props = BasicProperties::default()
688            .with_app_id("worker-svc".into())
689            .with_content_type("application/json".into());
690
691        let meta = extract_metadata_from_parts(&props, "emails.send", "notifications");
692        let obj = meta.as_object().unwrap();
693
694        assert_eq!(obj.get("app_id").unwrap(), "worker-svc");
695        assert_eq!(obj.get("content_type").unwrap(), "application/json");
696        assert_eq!(obj.get("routing_key").unwrap(), "emails.send");
697        assert_eq!(obj.get("exchange").unwrap(), "notifications");
698        // No headers key since none were set
699        assert!(!obj.contains_key("headers"));
700    }
701
702    #[test]
703    fn config_default_includes_metadata() {
704        let config = LapinAdapterConfig::default();
705        assert!(config.include_default_metadata);
706        assert!(config.metadata_extractor.is_none());
707    }
708
709    #[test]
710    fn config_debug_includes_metadata_fields() {
711        let config = LapinAdapterConfig::default();
712        let debug = format!("{config:?}");
713        assert!(debug.contains("metadata_extractor"));
714        assert!(debug.contains("include_default_metadata"));
715    }
716
717    #[test]
718    fn config_default_has_no_correlation_id_extractor() {
719        let config = LapinAdapterConfig::default();
720        assert!(config.correlation_id_extractor.is_none());
721        assert!(config.max_attempts.is_none());
722    }
723
724    #[test]
725    fn config_debug_includes_new_fields() {
726        let config = LapinAdapterConfig::default();
727        let debug = format!("{config:?}");
728        assert!(debug.contains("correlation_id_extractor"));
729        assert!(debug.contains("max_attempts"));
730    }
731
732    // --- extract_string_field_from_payload tests ---
733
734    fn parse(json: &str) -> serde_json::Value {
735        serde_json::from_str(json).unwrap()
736    }
737
738    #[test]
739    fn payload_extracts_flat_job_id() {
740        let v = parse(r#"{"job_id": "abc-123", "name": "test"}"#);
741        assert_eq!(
742            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
743            Some("abc-123".to_owned())
744        );
745    }
746
747    #[test]
748    fn payload_extracts_camel_case_job_id() {
749        let v = parse(r#"{"jobId": "def-456"}"#);
750        assert_eq!(
751            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
752            Some("def-456".to_owned())
753        );
754    }
755
756    #[test]
757    fn payload_does_not_extract_generic_id() {
758        let v = parse(r#"{"id": "ghi-789", "name": "test"}"#);
759        assert_eq!(
760            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
761            None
762        );
763    }
764
765    #[test]
766    fn payload_job_id_takes_priority_over_camel_case() {
767        let v = parse(r#"{"job_id": "snake", "jobId": "camel"}"#);
768        assert_eq!(
769            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
770            Some("snake".to_owned())
771        );
772    }
773
774    #[test]
775    fn payload_extracts_from_externally_tagged_enum() {
776        let v = parse(r#"{"CreateEvent": {"job_id": "019-uuid", "correlation_id": "trace-1"}}"#);
777        assert_eq!(
778            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
779            Some("019-uuid".to_owned())
780        );
781    }
782
783    #[test]
784    fn payload_tagged_enum_correlation_id() {
785        let v = parse(r#"{"CreateEvent": {"job_id": "019-uuid", "correlation_id": "trace-1"}}"#);
786        assert_eq!(
787            extract_string_field_from_payload(Some(&v), &["correlation_id", "correlationId"]),
788            Some("trace-1".to_owned())
789        );
790    }
791
792    #[test]
793    fn payload_returns_none_for_missing_field() {
794        let v = parse(r#"{"name": "test", "count": 42}"#);
795        assert_eq!(
796            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
797            None
798        );
799    }
800
801    #[test]
802    fn payload_returns_none_when_no_parsed_value() {
803        assert_eq!(extract_string_field_from_payload(None, &["job_id"]), None);
804    }
805
806    #[test]
807    fn payload_skips_empty_string_values() {
808        let v = parse(r#"{"job_id": "", "jobId": "fallback"}"#);
809        assert_eq!(
810            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
811            Some("fallback".to_owned())
812        );
813    }
814
815    #[test]
816    fn payload_extracts_numeric_job_id() {
817        let v = parse(r#"{"job_id": 12345}"#);
818        assert_eq!(
819            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
820            Some("12345".to_owned())
821        );
822    }
823
824    #[test]
825    fn payload_extracts_numeric_zero() {
826        let v = parse(r#"{"job_id": 0}"#);
827        assert_eq!(
828            extract_string_field_from_payload(Some(&v), &["job_id"]),
829            Some("0".to_owned())
830        );
831    }
832
833    #[test]
834    fn payload_extracts_numeric_from_tagged_enum() {
835        let v = parse(r#"{"SendEmail": {"job_id": 42}}"#);
836        assert_eq!(
837            extract_string_field_from_payload(Some(&v), &["job_id"]),
838            Some("42".to_owned())
839        );
840    }
841
842    #[test]
843    fn payload_string_preferred_over_numeric_by_field_order() {
844        // When job_id is numeric but jobId is a string, job_id (first in field_names) wins
845        let v = parse(r#"{"job_id": 42, "jobId": "string-id"}"#);
846        assert_eq!(
847            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
848            Some("42".to_owned())
849        );
850    }
851
852    #[test]
853    fn payload_skips_non_string_non_numeric_values() {
854        let v = parse(r#"{"job_id": true, "jobId": "string-id"}"#);
855        assert_eq!(
856            extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
857            Some("string-id".to_owned())
858        );
859    }
860
861    #[test]
862    fn payload_does_not_recurse_into_multi_key_objects() {
863        let v = parse(r#"{"key1": {"job_id": "inner"}, "key2": "val"}"#);
864        assert_eq!(
865            extract_string_field_from_payload(Some(&v), &["job_id"]),
866            None
867        );
868    }
869
870    #[test]
871    fn payload_handles_null_correlation_id() {
872        let v = parse(r#"{"CreateEvent": {"job_id": "019", "correlation_id": null}}"#);
873        assert_eq!(
874            extract_string_field_from_payload(Some(&v), &["correlation_id", "correlationId"]),
875            None
876        );
877    }
878}