1use std::sync::Arc;
4
5use lapin::types::AMQPValue;
6
7#[derive(Clone)]
9#[allow(clippy::type_complexity)]
10pub struct LapinAdapterConfig {
11 pub default_job_type: String,
13 pub include_payload: bool,
16 pub max_payload_size: usize,
19 pub job_type_extractor: Option<Arc<dyn Fn(&lapin::message::Delivery) -> String + Send + Sync>>,
21 pub job_id_extractor: Option<Arc<dyn Fn(&lapin::message::Delivery) -> String + Send + Sync>>,
24 pub metadata_extractor:
27 Option<Arc<dyn Fn(&lapin::message::Delivery) -> serde_json::Value + Send + Sync>>,
28 pub include_default_metadata: bool,
30 pub correlation_id_extractor:
34 Option<Arc<dyn Fn(&lapin::message::Delivery) -> Option<String> + Send + Sync>>,
35 pub max_attempts: Option<i32>,
38}
39
40impl Default for LapinAdapterConfig {
41 fn default() -> Self {
42 Self {
43 default_job_type: "unknown".to_string(),
44 include_payload: false,
45 max_payload_size: 65_536,
46 job_type_extractor: None,
47 job_id_extractor: None,
48 metadata_extractor: None,
49 include_default_metadata: true,
50 correlation_id_extractor: None,
51 max_attempts: None,
52 }
53 }
54}
55
56impl std::fmt::Debug for LapinAdapterConfig {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("LapinAdapterConfig")
59 .field("default_job_type", &self.default_job_type)
60 .field("include_payload", &self.include_payload)
61 .field("max_payload_size", &self.max_payload_size)
62 .field(
63 "job_type_extractor",
64 &self.job_type_extractor.as_ref().map(|_| "..."),
65 )
66 .field(
67 "job_id_extractor",
68 &self.job_id_extractor.as_ref().map(|_| "..."),
69 )
70 .field(
71 "metadata_extractor",
72 &self.metadata_extractor.as_ref().map(|_| "..."),
73 )
74 .field("include_default_metadata", &self.include_default_metadata)
75 .field(
76 "correlation_id_extractor",
77 &self.correlation_id_extractor.as_ref().map(|_| "..."),
78 )
79 .field("max_attempts", &self.max_attempts)
80 .finish()
81 }
82}
83
84pub(crate) fn extract_retry_count(delivery: &lapin::message::Delivery) -> Option<i32> {
92 delivery
93 .properties
94 .headers()
95 .as_ref()
96 .and_then(|h| h.inner().get("x-retry-count"))
97 .and_then(|v| match v {
98 AMQPValue::LongLongInt(n) => i32::try_from(*n).ok(),
99 AMQPValue::ShortShortInt(n) => Some(i32::from(*n)),
100 AMQPValue::ShortInt(n) => Some(i32::from(*n)),
101 AMQPValue::LongInt(n) => Some(*n),
102 _ => None,
103 })
104}
105
106fn amqp_value_as_string(value: &AMQPValue) -> Option<String> {
108 match value {
109 AMQPValue::LongString(s) => {
110 let s = String::from_utf8_lossy(s.as_bytes()).into_owned();
111 if s.is_empty() {
112 None
113 } else {
114 Some(s)
115 }
116 }
117 AMQPValue::ShortString(s) => {
118 let s = s.as_str();
119 if s.is_empty() {
120 None
121 } else {
122 Some(s.to_owned())
123 }
124 }
125 _ => None,
126 }
127}
128
129fn header_string(properties: &lapin::BasicProperties, key: &str) -> Option<String> {
131 properties
132 .headers()
133 .as_ref()
134 .and_then(|table| table.inner().get(key))
135 .and_then(amqp_value_as_string)
136}
137
138pub(crate) fn extract_job_type(
147 properties: &lapin::BasicProperties,
148 routing_key: &lapin::types::ShortString,
149 default: &str,
150) -> String {
151 if let Some(kind) = properties.kind().as_ref() {
153 let s = kind.as_str();
154 if !s.is_empty() {
155 return s.to_owned();
156 }
157 }
158
159 if let Some(jt) = header_string(properties, "x-job-type") {
161 return jt;
162 }
163
164 if let Some(jt) = header_string(properties, "job_type") {
166 return jt;
167 }
168
169 let rk = routing_key.as_str();
171 if !rk.is_empty() {
172 return rk.to_owned();
173 }
174
175 default.to_owned()
177}
178
179pub(crate) fn extract_job_id(
191 delivery: &lapin::message::Delivery,
192 parsed_payload: Option<&serde_json::Value>,
193) -> String {
194 if let Some(msg_id) = delivery.properties.message_id().as_ref() {
196 let s = msg_id.as_str();
197 if !s.is_empty() {
198 return s.to_owned();
199 }
200 }
201
202 if let Some(job_id) = header_string(&delivery.properties, "x-job-id") {
204 return job_id;
205 }
206
207 if let Some(id) = extract_string_field_from_payload(parsed_payload, &["job_id", "jobId"]) {
209 return id;
210 }
211
212 delivery.delivery_tag.to_string()
214}
215
216pub(crate) fn extract_correlation_id(
227 delivery: &lapin::message::Delivery,
228 parsed_payload: Option<&serde_json::Value>,
229) -> Option<String> {
230 if let Some(cid) = delivery.properties.correlation_id().as_ref() {
232 let s = cid.as_str();
233 if !s.is_empty() {
234 return Some(s.to_owned());
235 }
236 }
237
238 if let Some(cid) = header_string(&delivery.properties, "x-correlation-id") {
240 return Some(cid);
241 }
242
243 extract_string_field_from_payload(parsed_payload, &["correlation_id", "correlationId"])
245}
246
247fn extract_string_field_from_payload(
254 parsed: Option<&serde_json::Value>,
255 field_names: &[&str],
256) -> Option<String> {
257 let value = parsed?;
258
259 if let Some(id) = try_extract_string_field(value, field_names) {
261 return Some(id);
262 }
263
264 if let Some(obj) = value.as_object() {
267 if obj.len() == 1 {
268 if let Some(inner) = obj.values().next() {
269 return try_extract_string_field(inner, field_names);
270 }
271 }
272 }
273
274 None
275}
276
277fn try_extract_string_field(value: &serde_json::Value, field_names: &[&str]) -> Option<String> {
281 for key in field_names {
282 match value.get(*key) {
283 Some(serde_json::Value::String(s)) if !s.is_empty() => return Some(s.clone()),
284 Some(serde_json::Value::Number(n)) => return Some(n.to_string()),
285 _ => continue,
286 }
287 }
288 None
289}
290
291pub fn extract_default_metadata(delivery: &lapin::message::Delivery) -> serde_json::Value {
297 extract_metadata_from_parts(
298 &delivery.properties,
299 delivery.routing_key.as_str(),
300 delivery.exchange.as_str(),
301 )
302}
303
304fn extract_metadata_from_parts(
311 props: &lapin::BasicProperties,
312 routing_key: &str,
313 exchange: &str,
314) -> serde_json::Value {
315 let mut meta = serde_json::Map::new();
316
317 if let Some(msg_id) = props.message_id().as_ref() {
318 let s = msg_id.as_str();
319 if !s.is_empty() {
320 meta.insert("message_id".into(), serde_json::Value::String(s.to_owned()));
321 }
322 }
323 if let Some(app_id) = props.app_id().as_ref() {
324 let s = app_id.as_str();
325 if !s.is_empty() {
326 meta.insert("app_id".into(), serde_json::Value::String(s.to_owned()));
327 }
328 }
329 if let Some(content_type) = props.content_type().as_ref() {
330 let s = content_type.as_str();
331 if !s.is_empty() {
332 meta.insert(
333 "content_type".into(),
334 serde_json::Value::String(s.to_owned()),
335 );
336 }
337 }
338 if let Some(content_encoding) = props.content_encoding().as_ref() {
339 let s = content_encoding.as_str();
340 if !s.is_empty() {
341 meta.insert(
342 "content_encoding".into(),
343 serde_json::Value::String(s.to_owned()),
344 );
345 }
346 }
347 if let Some(priority) = props.priority() {
348 meta.insert("priority".into(), serde_json::json!(*priority));
349 }
350 if let Some(timestamp) = props.timestamp() {
351 meta.insert("timestamp".into(), serde_json::json!(*timestamp));
352 }
353 if let Some(reply_to) = props.reply_to().as_ref() {
354 let s = reply_to.as_str();
355 if !s.is_empty() {
356 meta.insert("reply_to".into(), serde_json::Value::String(s.to_owned()));
357 }
358 }
359 if let Some(expiration) = props.expiration().as_ref() {
360 let s = expiration.as_str();
361 if !s.is_empty() {
362 meta.insert("expiration".into(), serde_json::Value::String(s.to_owned()));
363 }
364 }
365
366 if !routing_key.is_empty() {
368 meta.insert(
369 "routing_key".into(),
370 serde_json::Value::String(routing_key.to_owned()),
371 );
372 }
373 if !exchange.is_empty() {
374 meta.insert(
375 "exchange".into(),
376 serde_json::Value::String(exchange.to_owned()),
377 );
378 }
379
380 if let Some(headers) = props.headers().as_ref() {
382 let mut user_headers = serde_json::Map::new();
383 for (key, value) in headers.inner() {
384 let key_str = key.as_str();
385 if key_str.starts_with("x-death")
386 || key_str == "x-first-death-exchange"
387 || key_str == "x-first-death-queue"
388 || key_str == "x-first-death-reason"
389 {
390 continue;
391 }
392 if let Some(s) = amqp_value_as_string(value) {
393 user_headers.insert(key_str.to_owned(), serde_json::Value::String(s));
394 }
395 }
396 if !user_headers.is_empty() {
397 meta.insert("headers".into(), serde_json::Value::Object(user_headers));
398 }
399 }
400
401 serde_json::Value::Object(meta)
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use lapin::types::{FieldTable, LongString, ShortString};
408 use lapin::BasicProperties;
409
410 #[test]
411 fn extracts_from_amqp_type_property() {
412 let props = BasicProperties::default().with_type("SendEmail".into());
413 let rk = ShortString::from("");
414 assert_eq!(extract_job_type(&props, &rk, "unknown"), "SendEmail");
415 }
416
417 #[test]
418 fn extracts_from_x_job_type_header() {
419 let mut headers = FieldTable::default();
420 headers.insert(
421 "x-job-type".into(),
422 AMQPValue::LongString(LongString::from("ProcessPayment")),
423 );
424 let props = BasicProperties::default().with_headers(headers);
425 let rk = ShortString::from("some.routing.key");
426 assert_eq!(extract_job_type(&props, &rk, "unknown"), "ProcessPayment");
427 }
428
429 #[test]
430 fn extracts_from_job_type_header() {
431 let mut headers = FieldTable::default();
432 headers.insert(
433 "job_type".into(),
434 AMQPValue::LongString(LongString::from("GenerateReport")),
435 );
436 let props = BasicProperties::default().with_headers(headers);
437 let rk = ShortString::from("");
438 assert_eq!(extract_job_type(&props, &rk, "unknown"), "GenerateReport");
439 }
440
441 #[test]
442 fn extracts_from_routing_key() {
443 let props = BasicProperties::default();
444 let rk = ShortString::from("orders.process");
445 assert_eq!(extract_job_type(&props, &rk, "unknown"), "orders.process");
446 }
447
448 #[test]
449 fn falls_back_to_default() {
450 let props = BasicProperties::default();
451 let rk = ShortString::from("");
452 assert_eq!(extract_job_type(&props, &rk, "fallback"), "fallback");
453 }
454
455 #[test]
456 fn amqp_type_takes_priority_over_headers_and_routing_key() {
457 let mut headers = FieldTable::default();
458 headers.insert(
459 "x-job-type".into(),
460 AMQPValue::LongString(LongString::from("from_header")),
461 );
462 let props = BasicProperties::default()
463 .with_type("from_type".into())
464 .with_headers(headers);
465 let rk = ShortString::from("from_routing_key");
466 assert_eq!(extract_job_type(&props, &rk, "unknown"), "from_type");
467 }
468
469 #[test]
470 fn x_job_type_takes_priority_over_job_type_header() {
471 let mut headers = FieldTable::default();
472 headers.insert(
473 "x-job-type".into(),
474 AMQPValue::LongString(LongString::from("from_x_header")),
475 );
476 headers.insert(
477 "job_type".into(),
478 AMQPValue::LongString(LongString::from("from_generic_header")),
479 );
480 let props = BasicProperties::default().with_headers(headers);
481 let rk = ShortString::from("");
482 assert_eq!(extract_job_type(&props, &rk, "unknown"), "from_x_header");
483 }
484
485 #[test]
486 fn handles_short_string_header_values() {
487 let mut headers = FieldTable::default();
488 headers.insert(
489 "x-job-type".into(),
490 AMQPValue::ShortString("ShortVal".into()),
491 );
492 let props = BasicProperties::default().with_headers(headers);
493 let rk = ShortString::from("");
494 assert_eq!(extract_job_type(&props, &rk, "unknown"), "ShortVal");
495 }
496
497 #[test]
498 fn skips_empty_amqp_type() {
499 let props = BasicProperties::default().with_type("".into());
500 let rk = ShortString::from("fallback_rk");
501 assert_eq!(extract_job_type(&props, &rk, "unknown"), "fallback_rk");
502 }
503
504 #[test]
505 fn skips_empty_header_values() {
506 let mut headers = FieldTable::default();
507 headers.insert(
508 "x-job-type".into(),
509 AMQPValue::LongString(LongString::from("")),
510 );
511 let props = BasicProperties::default().with_headers(headers);
512 let rk = ShortString::from("routing_fallback");
513 assert_eq!(extract_job_type(&props, &rk, "unknown"), "routing_fallback");
514 }
515
516 #[test]
519 fn metadata_extracts_standard_amqp_properties() {
520 let props = BasicProperties::default()
521 .with_app_id("my-app".into())
522 .with_message_id("msg-123".into())
523 .with_content_type("application/json".into())
524 .with_content_encoding("utf-8".into())
525 .with_reply_to("reply-queue".into())
526 .with_expiration("60000".into())
527 .with_priority(5)
528 .with_timestamp(1700000000);
529
530 let meta = extract_metadata_from_parts(&props, "", "");
531 let obj = meta.as_object().unwrap();
532
533 assert_eq!(obj.get("app_id").unwrap(), "my-app");
534 assert_eq!(obj.get("message_id").unwrap(), "msg-123");
535 assert_eq!(obj.get("content_type").unwrap(), "application/json");
536 assert_eq!(obj.get("content_encoding").unwrap(), "utf-8");
537 assert_eq!(obj.get("reply_to").unwrap(), "reply-queue");
538 assert_eq!(obj.get("expiration").unwrap(), "60000");
539 assert_eq!(obj.get("priority").unwrap(), 5);
540 assert_eq!(obj.get("timestamp").unwrap(), 1700000000u64);
541 }
542
543 #[test]
544 fn metadata_includes_routing_key_and_exchange() {
545 let props = BasicProperties::default();
546 let meta = extract_metadata_from_parts(&props, "orders.process", "my-exchange");
547 let obj = meta.as_object().unwrap();
548
549 assert_eq!(obj.get("routing_key").unwrap(), "orders.process");
550 assert_eq!(obj.get("exchange").unwrap(), "my-exchange");
551 }
552
553 #[test]
554 fn metadata_empty_for_default_properties_and_empty_routing() {
555 let props = BasicProperties::default();
556 let meta = extract_metadata_from_parts(&props, "", "");
557 let obj = meta.as_object().unwrap();
558
559 assert!(obj.is_empty(), "Expected empty metadata, got: {obj:?}");
560 }
561
562 #[test]
563 fn metadata_skips_empty_string_properties() {
564 let props = BasicProperties::default()
565 .with_app_id("".into())
566 .with_message_id("".into())
567 .with_content_type("".into())
568 .with_reply_to("".into())
569 .with_expiration("".into());
570
571 let meta = extract_metadata_from_parts(&props, "", "");
572 let obj = meta.as_object().unwrap();
573
574 assert!(
575 obj.is_empty(),
576 "Expected empty metadata for empty string props, got: {obj:?}"
577 );
578 }
579
580 #[test]
581 fn metadata_filters_internal_rabbitmq_headers() {
582 let mut headers = FieldTable::default();
583 headers.insert(
584 "x-death".into(),
585 AMQPValue::LongString(LongString::from("death-info")),
586 );
587 headers.insert(
588 "x-death-count".into(),
589 AMQPValue::LongString(LongString::from("3")),
590 );
591 headers.insert(
592 "x-first-death-exchange".into(),
593 AMQPValue::LongString(LongString::from("dlx")),
594 );
595 headers.insert(
596 "x-first-death-queue".into(),
597 AMQPValue::LongString(LongString::from("dlq")),
598 );
599 headers.insert(
600 "x-first-death-reason".into(),
601 AMQPValue::LongString(LongString::from("rejected")),
602 );
603 headers.insert(
604 "custom-header".into(),
605 AMQPValue::LongString(LongString::from("my-value")),
606 );
607 headers.insert(
608 "correlation-id".into(),
609 AMQPValue::LongString(LongString::from("abc-123")),
610 );
611 let props = BasicProperties::default().with_headers(headers);
612
613 let meta = extract_metadata_from_parts(&props, "", "");
614 let obj = meta.as_object().unwrap();
615 let user_headers = obj.get("headers").unwrap().as_object().unwrap();
616
617 assert!(!user_headers.contains_key("x-death"));
619 assert!(!user_headers.contains_key("x-death-count"));
620 assert!(!user_headers.contains_key("x-first-death-exchange"));
621 assert!(!user_headers.contains_key("x-first-death-queue"));
622 assert!(!user_headers.contains_key("x-first-death-reason"));
623
624 assert_eq!(user_headers.get("custom-header").unwrap(), "my-value");
626 assert_eq!(user_headers.get("correlation-id").unwrap(), "abc-123");
627 }
628
629 #[test]
630 fn metadata_includes_short_string_headers() {
631 let mut headers = FieldTable::default();
632 headers.insert("tenant-id".into(), AMQPValue::ShortString("org_123".into()));
633 let props = BasicProperties::default().with_headers(headers);
634
635 let meta = extract_metadata_from_parts(&props, "", "");
636 let obj = meta.as_object().unwrap();
637 let user_headers = obj.get("headers").unwrap().as_object().unwrap();
638
639 assert_eq!(user_headers.get("tenant-id").unwrap(), "org_123");
640 }
641
642 #[test]
643 fn metadata_omits_headers_section_when_all_filtered() {
644 let mut headers = FieldTable::default();
645 headers.insert(
646 "x-death".into(),
647 AMQPValue::LongString(LongString::from("death-info")),
648 );
649 headers.insert(
650 "x-first-death-exchange".into(),
651 AMQPValue::LongString(LongString::from("dlx")),
652 );
653 let props = BasicProperties::default().with_headers(headers);
654
655 let meta = extract_metadata_from_parts(&props, "", "");
656 let obj = meta.as_object().unwrap();
657
658 assert!(
659 !obj.contains_key("headers"),
660 "Headers section should be omitted when all headers are internal"
661 );
662 }
663
664 #[test]
665 fn metadata_skips_non_string_header_values() {
666 let mut headers = FieldTable::default();
667 headers.insert("numeric-header".into(), AMQPValue::LongLongInt(42));
668 headers.insert("bool-header".into(), AMQPValue::Boolean(true));
669 headers.insert(
670 "string-header".into(),
671 AMQPValue::LongString(LongString::from("kept")),
672 );
673 let props = BasicProperties::default().with_headers(headers);
674
675 let meta = extract_metadata_from_parts(&props, "", "");
676 let obj = meta.as_object().unwrap();
677 let user_headers = obj.get("headers").unwrap().as_object().unwrap();
678
679 assert!(!user_headers.contains_key("numeric-header"));
681 assert!(!user_headers.contains_key("bool-header"));
682 assert_eq!(user_headers.get("string-header").unwrap(), "kept");
683 }
684
685 #[test]
686 fn metadata_combined_properties_and_delivery_info() {
687 let props = BasicProperties::default()
688 .with_app_id("worker-svc".into())
689 .with_content_type("application/json".into());
690
691 let meta = extract_metadata_from_parts(&props, "emails.send", "notifications");
692 let obj = meta.as_object().unwrap();
693
694 assert_eq!(obj.get("app_id").unwrap(), "worker-svc");
695 assert_eq!(obj.get("content_type").unwrap(), "application/json");
696 assert_eq!(obj.get("routing_key").unwrap(), "emails.send");
697 assert_eq!(obj.get("exchange").unwrap(), "notifications");
698 assert!(!obj.contains_key("headers"));
700 }
701
702 #[test]
703 fn config_default_includes_metadata() {
704 let config = LapinAdapterConfig::default();
705 assert!(config.include_default_metadata);
706 assert!(config.metadata_extractor.is_none());
707 }
708
709 #[test]
710 fn config_debug_includes_metadata_fields() {
711 let config = LapinAdapterConfig::default();
712 let debug = format!("{config:?}");
713 assert!(debug.contains("metadata_extractor"));
714 assert!(debug.contains("include_default_metadata"));
715 }
716
717 #[test]
718 fn config_default_has_no_correlation_id_extractor() {
719 let config = LapinAdapterConfig::default();
720 assert!(config.correlation_id_extractor.is_none());
721 assert!(config.max_attempts.is_none());
722 }
723
724 #[test]
725 fn config_debug_includes_new_fields() {
726 let config = LapinAdapterConfig::default();
727 let debug = format!("{config:?}");
728 assert!(debug.contains("correlation_id_extractor"));
729 assert!(debug.contains("max_attempts"));
730 }
731
732 fn parse(json: &str) -> serde_json::Value {
735 serde_json::from_str(json).unwrap()
736 }
737
738 #[test]
739 fn payload_extracts_flat_job_id() {
740 let v = parse(r#"{"job_id": "abc-123", "name": "test"}"#);
741 assert_eq!(
742 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
743 Some("abc-123".to_owned())
744 );
745 }
746
747 #[test]
748 fn payload_extracts_camel_case_job_id() {
749 let v = parse(r#"{"jobId": "def-456"}"#);
750 assert_eq!(
751 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
752 Some("def-456".to_owned())
753 );
754 }
755
756 #[test]
757 fn payload_does_not_extract_generic_id() {
758 let v = parse(r#"{"id": "ghi-789", "name": "test"}"#);
759 assert_eq!(
760 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
761 None
762 );
763 }
764
765 #[test]
766 fn payload_job_id_takes_priority_over_camel_case() {
767 let v = parse(r#"{"job_id": "snake", "jobId": "camel"}"#);
768 assert_eq!(
769 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
770 Some("snake".to_owned())
771 );
772 }
773
774 #[test]
775 fn payload_extracts_from_externally_tagged_enum() {
776 let v = parse(r#"{"CreateEvent": {"job_id": "019-uuid", "correlation_id": "trace-1"}}"#);
777 assert_eq!(
778 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
779 Some("019-uuid".to_owned())
780 );
781 }
782
783 #[test]
784 fn payload_tagged_enum_correlation_id() {
785 let v = parse(r#"{"CreateEvent": {"job_id": "019-uuid", "correlation_id": "trace-1"}}"#);
786 assert_eq!(
787 extract_string_field_from_payload(Some(&v), &["correlation_id", "correlationId"]),
788 Some("trace-1".to_owned())
789 );
790 }
791
792 #[test]
793 fn payload_returns_none_for_missing_field() {
794 let v = parse(r#"{"name": "test", "count": 42}"#);
795 assert_eq!(
796 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
797 None
798 );
799 }
800
801 #[test]
802 fn payload_returns_none_when_no_parsed_value() {
803 assert_eq!(extract_string_field_from_payload(None, &["job_id"]), None);
804 }
805
806 #[test]
807 fn payload_skips_empty_string_values() {
808 let v = parse(r#"{"job_id": "", "jobId": "fallback"}"#);
809 assert_eq!(
810 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
811 Some("fallback".to_owned())
812 );
813 }
814
815 #[test]
816 fn payload_extracts_numeric_job_id() {
817 let v = parse(r#"{"job_id": 12345}"#);
818 assert_eq!(
819 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
820 Some("12345".to_owned())
821 );
822 }
823
824 #[test]
825 fn payload_extracts_numeric_zero() {
826 let v = parse(r#"{"job_id": 0}"#);
827 assert_eq!(
828 extract_string_field_from_payload(Some(&v), &["job_id"]),
829 Some("0".to_owned())
830 );
831 }
832
833 #[test]
834 fn payload_extracts_numeric_from_tagged_enum() {
835 let v = parse(r#"{"SendEmail": {"job_id": 42}}"#);
836 assert_eq!(
837 extract_string_field_from_payload(Some(&v), &["job_id"]),
838 Some("42".to_owned())
839 );
840 }
841
842 #[test]
843 fn payload_string_preferred_over_numeric_by_field_order() {
844 let v = parse(r#"{"job_id": 42, "jobId": "string-id"}"#);
846 assert_eq!(
847 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
848 Some("42".to_owned())
849 );
850 }
851
852 #[test]
853 fn payload_skips_non_string_non_numeric_values() {
854 let v = parse(r#"{"job_id": true, "jobId": "string-id"}"#);
855 assert_eq!(
856 extract_string_field_from_payload(Some(&v), &["job_id", "jobId"]),
857 Some("string-id".to_owned())
858 );
859 }
860
861 #[test]
862 fn payload_does_not_recurse_into_multi_key_objects() {
863 let v = parse(r#"{"key1": {"job_id": "inner"}, "key2": "val"}"#);
864 assert_eq!(
865 extract_string_field_from_payload(Some(&v), &["job_id"]),
866 None
867 );
868 }
869
870 #[test]
871 fn payload_handles_null_correlation_id() {
872 let v = parse(r#"{"CreateEvent": {"job_id": "019", "correlation_id": null}}"#);
873 assert_eq!(
874 extract_string_field_from_payload(Some(&v), &["correlation_id", "correlationId"]),
875 None
876 );
877 }
878}